[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4939


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r151110083
  
--- Diff: flink-yarn/pom.xml ---
@@ -153,6 +159,63 @@ under the License.



+
+   
+   
+   include_hadoop_aws
+   
+   
+   include_hadoop_aws
+   
+   
+   
+   
+   
+   org.apache.hadoop
+   hadoop-aws
+   ${hadoop.version}
+   test
+   
+   
+   
org.apache.avro
+   
avro
+   
--- End diff --

IIRC avro was excluded since it was just not necessary for the file-system, 
in which case it should stay.


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-10 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r150275387
  
--- Diff: flink-yarn/pom.xml ---
@@ -153,6 +159,63 @@ under the License.



+
+   
+   
+   include_hadoop_aws
+   
+   
+   include_hadoop_aws
+   
+   
+   
+   
+   
+   org.apache.hadoop
+   hadoop-aws
+   ${hadoop.version}
+   test
+   
+   
+   
org.apache.avro
+   
avro
+   
--- End diff --

makes sense, I guess


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-10 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r150275161
  
--- Diff: flink-yarn/pom.xml ---
@@ -153,6 +159,63 @@ under the License.



+
+   
+   
+   include_hadoop_aws
+   
+   
+   include_hadoop_aws
+   
+   
+   
+   
+   
+   org.apache.hadoop
+   hadoop-aws
+   ${hadoop.version}
+   test
+   
+   
+   
org.apache.avro
+   
avro
+   
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-annotations
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-core
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-databind
+   
--- End diff --

this dependency is, however, somehow related to `com.amazonaws`, not our 
code...


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149980302
  
--- Diff: flink-yarn/pom.xml ---
@@ -153,6 +159,63 @@ under the License.



+
+   
+   
+   include_hadoop_aws
+   
+   
+   include_hadoop_aws
+   
+   
+   
+   
+   
+   org.apache.hadoop
+   hadoop-aws
+   ${hadoop.version}
+   test
+   
+   
+   
org.apache.avro
+   
avro
+   
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-annotations
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-core
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-databind
+   
--- End diff --

Can't we enforce jackson 2.6 via dependency management? I think this would 
be cleaner than excluding the dependencies here and assume that 
`aws-java-sdk-s3` pulls in the missing dependencies.


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149983029
  
--- Diff: flink-yarn/pom.xml ---
@@ -153,6 +159,63 @@ under the License.



+
+   
+   
+   include_hadoop_aws
+   
+   
+   include_hadoop_aws
+   
+   
+   
+   
+   
+   org.apache.hadoop
+   hadoop-aws
+   ${hadoop.version}
+   test
+   
+   
+   
org.apache.avro
+   
avro
+   
--- End diff --

Shouldn't we override the avro version instead of excluding it?


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-09 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149983025
  
--- Diff: flink-yarn/pom.xml ---
@@ -153,6 +159,63 @@ under the License.



+
+   
+   
+   include_hadoop_aws
+   
+   
+   include_hadoop_aws
+   
+   
+   
+   
+   
+   org.apache.hadoop
+   hadoop-aws
+   ${hadoop.version}
+   test
+   
+   
+   
org.apache.avro
+   
avro
+   
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-annotations
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-core
+   
+   
+   
com.fasterxml.jackson.core
+   
jackson-databind
+   
--- End diff --

It might be necessary to add these dependencies then explicitly here.


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149084279
  
--- Diff: flink-yarn/pom.xml ---
@@ -99,6 +99,13 @@ under the License.

 

+   org.apache.flink
+   flink-s3-fs-hadoop
+   ${project.version}
+   test
+   
--- End diff --

I think it's also ok this way :-)


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149083811
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -117,27 +118,50 @@ public static void setupYarnClassPath(Configuration 
conf, Map ap
}
 
/**
+* Copy a local file to a remote file system.
+*
+* @param fs
+*  remote filesystem
+* @param appId
+*  application ID
+* @param localRsrcPath
+*  path to the local file
+* @param homedir
+*  remote home directory base (will be extended)
+* @param relativeTargetPath
+*  relative target path of the file (will be prefixed be 
the full home directory we set up)
+*
 * @return Path to remote file (usually hdfs)
-* @throws IOException
 */
-   public static Path setupLocalResource(
-   FileSystem fs,
-   String appId, Path localRsrcPath,
-   LocalResource appMasterJar,
-   Path homedir) throws IOException {
+   static Tuple2 setupLocalResource(
+   FileSystem fs,
+   String appId,
+   Path localRsrcPath,
+   Path homedir,
+   String relativeTargetPath) throws IOException {
+
+   if (new File(localRsrcPath.toUri().getPath()).isDirectory()) {
+   throw new IllegalArgumentException("File to copy must 
not be a directory: " +
+   localRsrcPath);
+   }
 
// copy resource to HDFS
-   String suffix = ".flink/" + appId + "/" + 
localRsrcPath.getName();
+   String suffix = ".flink/" + appId + "/" + relativeTargetPath + 
"/" + localRsrcPath.getName();
--- End diff --

you could also check if the `relativeTargetPath` is empty. I would opt for 
the safe approach since I'm not sure whether this behaviour is specific to a 
Hadoop version or not.


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-06 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149078183
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -408,10 +437,12 @@ static ContainerLaunchContext 
createTaskExecutorContext(
// prepare additional files to be shipped
for (String pathStr : shipListString.split(",")) {
if (!pathStr.isEmpty()) {
+   String[] pathWithKey = pathStr.split("=");
--- End diff --

yes, if you see this as the order of the arguments in the array - I'll 
adapt the name


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-06 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149077720
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -117,27 +118,50 @@ public static void setupYarnClassPath(Configuration 
conf, Map ap
}
 
/**
+* Copy a local file to a remote file system.
+*
+* @param fs
+*  remote filesystem
+* @param appId
+*  application ID
+* @param localRsrcPath
+*  path to the local file
+* @param homedir
+*  remote home directory base (will be extended)
+* @param relativeTargetPath
+*  relative target path of the file (will be prefixed be 
the full home directory we set up)
+*
 * @return Path to remote file (usually hdfs)
-* @throws IOException
 */
-   public static Path setupLocalResource(
-   FileSystem fs,
-   String appId, Path localRsrcPath,
-   LocalResource appMasterJar,
-   Path homedir) throws IOException {
+   static Tuple2 setupLocalResource(
+   FileSystem fs,
+   String appId,
+   Path localRsrcPath,
+   Path homedir,
+   String relativeTargetPath) throws IOException {
+
+   if (new File(localRsrcPath.toUri().getPath()).isDirectory()) {
+   throw new IllegalArgumentException("File to copy must 
not be a directory: " +
+   localRsrcPath);
+   }
 
// copy resource to HDFS
-   String suffix = ".flink/" + appId + "/" + 
localRsrcPath.getName();
+   String suffix = ".flink/" + appId + "/" + relativeTargetPath + 
"/" + localRsrcPath.getName();
 
Path dst = new Path(homedir, suffix);
 
LOG.info("Copying from " + localRsrcPath + " to " + dst);
-   fs.copyFromLocalFile(localRsrcPath, dst);
-   registerLocalResource(fs, dst, appMasterJar);
-   return dst;
+
+   fs.copyFromLocalFile(false, true, localRsrcPath, dst);
+
+   // now create the resource instance
+   LocalResource resource = Records.newRecord(LocalResource.class);
+   registerLocalResource(fs, dst, resource);
+   return Tuple2.of(dst, resource);
}
 
-   public static void registerLocalResource(FileSystem fs, Path 
remoteRsrcPath, LocalResource localResource) throws IOException {
+   private static void registerLocalResource(
--- End diff --

oh yes - totally...


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-06 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149077620
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -117,27 +118,50 @@ public static void setupYarnClassPath(Configuration 
conf, Map ap
}
 
/**
+* Copy a local file to a remote file system.
+*
+* @param fs
+*  remote filesystem
+* @param appId
+*  application ID
+* @param localRsrcPath
+*  path to the local file
+* @param homedir
+*  remote home directory base (will be extended)
+* @param relativeTargetPath
+*  relative target path of the file (will be prefixed be 
the full home directory we set up)
+*
 * @return Path to remote file (usually hdfs)
-* @throws IOException
 */
-   public static Path setupLocalResource(
-   FileSystem fs,
-   String appId, Path localRsrcPath,
-   LocalResource appMasterJar,
-   Path homedir) throws IOException {
+   static Tuple2 setupLocalResource(
+   FileSystem fs,
+   String appId,
+   Path localRsrcPath,
+   Path homedir,
+   String relativeTargetPath) throws IOException {
+
+   if (new File(localRsrcPath.toUri().getPath()).isDirectory()) {
+   throw new IllegalArgumentException("File to copy must 
not be a directory: " +
+   localRsrcPath);
+   }
 
// copy resource to HDFS
-   String suffix = ".flink/" + appId + "/" + 
localRsrcPath.getName();
+   String suffix = ".flink/" + appId + "/" + relativeTargetPath + 
"/" + localRsrcPath.getName();
--- End diff --

Yes, but apparently this is filtered out by `new Path(homedir, suffix);` 
and the right path is created. I did stumble upon this as well and 
double-checked the results. The alternative is, for example, using `null` if no 
relative path is desired and setting the `suffix` appropriately which is a bit 
more ugly but may also be cleaner/safer...
Would you rather go this way?


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-06 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149076237
  
--- Diff: flink-yarn/pom.xml ---
@@ -99,6 +99,13 @@ under the License.

 

+   org.apache.flink
+   flink-s3-fs-hadoop
+   ${project.version}
+   test
+   
--- End diff --

We could...
I wasn't putting this test there because in the project's pom.xml it says 
`There is a separate "flink-yarn-tests" package that expects the "flink-dist" 
package to be build before.` which we do not need for this test.

whatever you prefer...


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149070982
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -117,27 +118,50 @@ public static void setupYarnClassPath(Configuration 
conf, Map ap
}
 
/**
+* Copy a local file to a remote file system.
+*
+* @param fs
+*  remote filesystem
+* @param appId
+*  application ID
+* @param localRsrcPath
+*  path to the local file
+* @param homedir
+*  remote home directory base (will be extended)
+* @param relativeTargetPath
+*  relative target path of the file (will be prefixed be 
the full home directory we set up)
+*
 * @return Path to remote file (usually hdfs)
-* @throws IOException
 */
-   public static Path setupLocalResource(
-   FileSystem fs,
-   String appId, Path localRsrcPath,
-   LocalResource appMasterJar,
-   Path homedir) throws IOException {
+   static Tuple2 setupLocalResource(
+   FileSystem fs,
+   String appId,
+   Path localRsrcPath,
+   Path homedir,
+   String relativeTargetPath) throws IOException {
+
+   if (new File(localRsrcPath.toUri().getPath()).isDirectory()) {
+   throw new IllegalArgumentException("File to copy must 
not be a directory: " +
+   localRsrcPath);
+   }
 
// copy resource to HDFS
-   String suffix = ".flink/" + appId + "/" + 
localRsrcPath.getName();
+   String suffix = ".flink/" + appId + "/" + relativeTargetPath + 
"/" + localRsrcPath.getName();
 
Path dst = new Path(homedir, suffix);
 
LOG.info("Copying from " + localRsrcPath + " to " + dst);
-   fs.copyFromLocalFile(localRsrcPath, dst);
-   registerLocalResource(fs, dst, appMasterJar);
-   return dst;
+
+   fs.copyFromLocalFile(false, true, localRsrcPath, dst);
+
+   // now create the resource instance
+   LocalResource resource = Records.newRecord(LocalResource.class);
+   registerLocalResource(fs, dst, resource);
+   return Tuple2.of(dst, resource);
}
 
-   public static void registerLocalResource(FileSystem fs, Path 
remoteRsrcPath, LocalResource localResource) throws IOException {
+   private static void registerLocalResource(
+   FileSystem fs, Path remoteRsrcPath, LocalResource 
localResource) throws IOException {
--- End diff --

Either put every parameter on a different line or please revert this change.


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149069663
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -981,25 +998,54 @@ public ApplicationReport startAppMaster(
return report;
}
 
-   private static List uploadAndRegisterFiles(
-   Collection shipFiles,
+   /**
+* Uploads and registers a single resource and adds it to 
localResources.
+*
+* @param key
+*  the key to add the resource under
+* @param fs
+*  the remote file system to upload to
+* @param appId
+*  application ID
+* @param localRsrcPath
+*  local path to the file
+* @param localResources
+*  map of resources
+*
+* @return the remote path to the uploaded resource
+*/
+   private static Path setupSingleLocalResource(
+   String key,
FileSystem fs,
-   String appId,
-   List remotePaths,
+   ApplicationId appId,
+   Path localRsrcPath,
--- End diff --

Typo `localSrcPath`?


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149069107
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 ---
@@ -705,11 +707,12 @@ public ApplicationReport startAppMaster(
StringBuilder envShipFileList = new StringBuilder();
 
// upload and register ship files
-   List systemClassPaths = 
uploadAndRegisterFiles(systemShipFiles, fs, appId.toString(), paths, 
localResources, envShipFileList);
+   List systemClassPaths = 
uploadAndRegisterFiles(systemShipFiles, fs,
+   homeDir, appId, paths, localResources, envShipFileList);
--- End diff --

nit: I would either not break lines or break after each argument.


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149071810
  
--- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.util.OperatingSystem;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Tests for verifying file staging during submission to YARN works.
+ */
+public class YarnFileStageTest {
+
+   @ClassRule
+   public static final TemporaryFolder CLASS_TEMP_DIR = new 
TemporaryFolder();
+
+   @Rule
+   public TemporaryFolder tempFolder = new TemporaryFolder();
+
+   private static MiniDFSCluster hdfsCluster;
+
+   private static Path hdfsRootPath;
+
+   private org.apache.hadoop.conf.Configuration hadoopConfig;
+
+   // 

+   //  Test setup and shutdown
+   // 

+
+   @BeforeClass
+   public static void createHDFS() throws Exception {
+   Assume.assumeTrue(!OperatingSystem.isWindows());
+
+   final File tempDir = CLASS_TEMP_DIR.newFolder();
+
+   org.apache.hadoop.conf.Configuration hdConf = new 
org.apache.hadoop.conf.Configuration();
+   hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
tempDir.getAbsolutePath());
+
+   MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+   hdfsCluster = builder.build();
+   hdfsRootPath = new Path(hdfsCluster.getURI());
+   }
+
+   @AfterClass
+   public static void destroyHDFS() {
+   if (hdfsCluster != null) {
+   hdfsCluster.shutdown();
+   }
+   hdfsCluster = null;
+   hdfsRootPath = null;
+   }
+
+   @Before
+   public void initConfig() {
+   hadoopConfig = new org.apache.hadoop.conf.Configuration();
+   
hadoopConfig.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, 
hdfsRootPath.toString());
+   }
+
+   /**
+* Verifies that nested directories are properly copied with a 
hdfs:// file
+* system (from a file:///absolute/path source path).
+*/
+   @Test
+   public void testCopyFromLocalRecursiveWithScheme() throws Exception {
+   final FileSystem targetFileSystem = 
hdfsRootPath.getFileSystem(hadoopConfig);
+   final Path targetDir = targetFileSystem.getWorkingDirectory();
+
+   testCopyFromLocalRecursive(targetFileSystem, targetDir, 
tempFolder, true);
+   }
+
+   /**
+* Verifies that nested directories are properly copied with a 
hdfs:// file
+* 

[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149070583
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -117,27 +118,50 @@ public static void setupYarnClassPath(Configuration 
conf, Map ap
}
 
/**
+* Copy a local file to a remote file system.
+*
+* @param fs
+*  remote filesystem
+* @param appId
+*  application ID
+* @param localRsrcPath
+*  path to the local file
+* @param homedir
+*  remote home directory base (will be extended)
+* @param relativeTargetPath
+*  relative target path of the file (will be prefixed be 
the full home directory we set up)
+*
 * @return Path to remote file (usually hdfs)
-* @throws IOException
 */
-   public static Path setupLocalResource(
-   FileSystem fs,
-   String appId, Path localRsrcPath,
-   LocalResource appMasterJar,
-   Path homedir) throws IOException {
+   static Tuple2 setupLocalResource(
+   FileSystem fs,
+   String appId,
+   Path localRsrcPath,
--- End diff --

`localSrcPath`?


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149073723
  
--- Diff: flink-yarn/pom.xml ---
@@ -99,6 +99,13 @@ under the License.

 

+   org.apache.flink
+   flink-s3-fs-hadoop
+   ${project.version}
+   test
+   
--- End diff --

We could move the S3 upload test to `flink-yarn-tests`. That way we would 
only add this dependency to `flink-yarn-tests`.


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149071491
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -117,27 +118,50 @@ public static void setupYarnClassPath(Configuration 
conf, Map ap
}
 
/**
+* Copy a local file to a remote file system.
+*
+* @param fs
+*  remote filesystem
+* @param appId
+*  application ID
+* @param localRsrcPath
+*  path to the local file
+* @param homedir
+*  remote home directory base (will be extended)
+* @param relativeTargetPath
+*  relative target path of the file (will be prefixed be 
the full home directory we set up)
+*
 * @return Path to remote file (usually hdfs)
-* @throws IOException
 */
-   public static Path setupLocalResource(
-   FileSystem fs,
-   String appId, Path localRsrcPath,
-   LocalResource appMasterJar,
-   Path homedir) throws IOException {
+   static Tuple2 setupLocalResource(
+   FileSystem fs,
+   String appId,
+   Path localRsrcPath,
+   Path homedir,
+   String relativeTargetPath) throws IOException {
+
+   if (new File(localRsrcPath.toUri().getPath()).isDirectory()) {
+   throw new IllegalArgumentException("File to copy must 
not be a directory: " +
+   localRsrcPath);
+   }
 
// copy resource to HDFS
-   String suffix = ".flink/" + appId + "/" + 
localRsrcPath.getName();
+   String suffix = ".flink/" + appId + "/" + relativeTargetPath + 
"/" + localRsrcPath.getName();
 
Path dst = new Path(homedir, suffix);
 
LOG.info("Copying from " + localRsrcPath + " to " + dst);
-   fs.copyFromLocalFile(localRsrcPath, dst);
-   registerLocalResource(fs, dst, appMasterJar);
-   return dst;
+
+   fs.copyFromLocalFile(false, true, localRsrcPath, dst);
+
+   // now create the resource instance
+   LocalResource resource = Records.newRecord(LocalResource.class);
+   registerLocalResource(fs, dst, resource);
+   return Tuple2.of(dst, resource);
}
 
-   public static void registerLocalResource(FileSystem fs, Path 
remoteRsrcPath, LocalResource localResource) throws IOException {
+   private static void registerLocalResource(
--- End diff --

Touching this code, could we change it that we create and return a 
`LocalResource` in this method?


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149070829
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -117,27 +118,50 @@ public static void setupYarnClassPath(Configuration 
conf, Map ap
}
 
/**
+* Copy a local file to a remote file system.
+*
+* @param fs
+*  remote filesystem
+* @param appId
+*  application ID
+* @param localRsrcPath
+*  path to the local file
+* @param homedir
+*  remote home directory base (will be extended)
+* @param relativeTargetPath
+*  relative target path of the file (will be prefixed be 
the full home directory we set up)
+*
 * @return Path to remote file (usually hdfs)
-* @throws IOException
 */
-   public static Path setupLocalResource(
-   FileSystem fs,
-   String appId, Path localRsrcPath,
-   LocalResource appMasterJar,
-   Path homedir) throws IOException {
+   static Tuple2 setupLocalResource(
+   FileSystem fs,
+   String appId,
+   Path localRsrcPath,
+   Path homedir,
+   String relativeTargetPath) throws IOException {
+
+   if (new File(localRsrcPath.toUri().getPath()).isDirectory()) {
+   throw new IllegalArgumentException("File to copy must 
not be a directory: " +
+   localRsrcPath);
+   }
 
// copy resource to HDFS
-   String suffix = ".flink/" + appId + "/" + 
localRsrcPath.getName();
+   String suffix = ".flink/" + appId + "/" + relativeTargetPath + 
"/" + localRsrcPath.getName();
--- End diff --

What if `relativeTargetPath` is `""`. Wouldn't that lead to 
`appId//localSrcPath.getName()`?


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-06 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r149071109
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -408,10 +437,12 @@ static ContainerLaunchContext 
createTaskExecutorContext(
// prepare additional files to be shipped
for (String pathStr : shipListString.split(",")) {
if (!pathStr.isEmpty()) {
+   String[] pathWithKey = pathStr.split("=");
--- End diff --

Isn't this rather `keyWithPath`?


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-03 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r148756933
  
--- Diff: flink-filesystems/flink-s3-fs-hadoop/pom.xml ---
@@ -182,6 +182,21 @@ under the License.
${project.version}
test

+   
--- End diff --

Yes this is about the recursive upload which needs to be tested once with 
hdfs and once more with s3.

Sure we could flip the dependency and let the tests in the `yarn` 
sub-project depend on `flink-s3-fs-hadoop` (and I don't mind which depends on 
which, actually) but wouldn't this be just the same but in reverse?


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-03 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r148756295
  
--- Diff: 
flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
 ---
@@ -57,11 +62,52 @@
private static final String ACCESS_KEY = 
System.getenv("ARTIFACTS_AWS_ACCESS_KEY");
private static final String SECRET_KEY = 
System.getenv("ARTIFACTS_AWS_SECRET_KEY");
 
+   @Rule
+   public TemporaryFolder tempFolder = new TemporaryFolder();
+
@BeforeClass
-   public static void checkIfCredentialsArePresent() {
+   public static void checkCredentialsAndSetup() throws IOException {
+   // check whether credentials exist
Assume.assumeTrue("AWS S3 bucket not configured, skipping 
test...", BUCKET != null);
Assume.assumeTrue("AWS S3 access key not configured, skipping 
test...", ACCESS_KEY != null);
Assume.assumeTrue("AWS S3 secret key not configured, skipping 
test...", SECRET_KEY != null);
+
+   // initialize configuration with valid credentials
--- End diff --

This is actually not for the new test, but for the cleanup: the current 
state of the `HadoopS3FileSystemITCase` leaves this (random) test directory 
behind. In order to delete this after the tests of the class finished, I 
thought we should make sure that it did not exist before so that we are not 
deleting something we shouldn't!


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-02 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r148656504
  
--- Diff: flink-filesystems/flink-s3-fs-hadoop/pom.xml ---
@@ -182,6 +182,21 @@ under the License.
${project.version}
test

+   
--- End diff --

Would be great if we can avoid adding these dependencies.
This couples projects that were really meant to be independent, even if 
just in test scope.

If this is about testing recursive upload, can this be written properly as 
a test case in this project?
Or can the Yarn upload test be completely in the yarn test project, adding 
a dependency on this s3 project?


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-02 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/4939#discussion_r148656918
  
--- Diff: 
flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
 ---
@@ -57,11 +62,52 @@
private static final String ACCESS_KEY = 
System.getenv("ARTIFACTS_AWS_ACCESS_KEY");
private static final String SECRET_KEY = 
System.getenv("ARTIFACTS_AWS_SECRET_KEY");
 
+   @Rule
+   public TemporaryFolder tempFolder = new TemporaryFolder();
+
@BeforeClass
-   public static void checkIfCredentialsArePresent() {
+   public static void checkCredentialsAndSetup() throws IOException {
+   // check whether credentials exist
Assume.assumeTrue("AWS S3 bucket not configured, skipping 
test...", BUCKET != null);
Assume.assumeTrue("AWS S3 access key not configured, skipping 
test...", ACCESS_KEY != null);
Assume.assumeTrue("AWS S3 secret key not configured, skipping 
test...", SECRET_KEY != null);
+
+   // initialize configuration with valid credentials
--- End diff --

I would suggest to move this out of the "setup" method into the actual test.
The setup logic is not shared (all other test methods don't assume that 
setup) and it also assumes existence of functionality that is tested in other 
test methods..


---


[GitHub] flink pull request #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3...

2017-11-02 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/4939

[FLINK-4228][yarn/s3a] fix yarn resource upload s3a defaultFs

## What is the purpose of the change

If YARN is configured to use the `s3a` default file system, upload of the 
Flink jars will fail since its 
`org.apache.hadoop.fs.FileSystem#copyFromLocalFile()` does not work recursively 
on the given `lib` folder.

## Brief change log

- implement our own recursive upload (based on #2288)
- add unit tests to verify its behaviour for both `hdfs://` and `s3://` 
(via S3A) resource uploads

## Verifying this change

This change added tests and can be verified as follows:

- added a unit test for HDFS uploads via our `MiniDFSCluster`
- added integration test to verify S3 uploads (via the S3A filesystem 
implementation of the `flink-s3-fs-hadoop` sub-project)
- manually verified the test on YARN with both S3A and HDFS default file 
systems being set

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes - internally)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-4228

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4939.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4939


commit 5d31f41e0e480820e9fec1efa84e5725364a136d
Author: Nico Kruber 
Date:   2017-11-02T18:38:48Z

[hotfix][s3] fix HadoopS3FileSystemITCase leaving test directories behind 
in S3

commit bf47d376397a8e64625a031468d5f5d0a5486238
Author: Nico Kruber 
Date:   2016-11-09T20:04:50Z

[FLINK-4228][yarn/s3] fix for yarn staging with s3a defaultFs

+ includes a new unit tests for recursive uploads to hfds:// targets
+ add a unit test for recursive file uploads to s3:// via s3a




---