[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-27 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r339284001
 
 

 ##
 File path: flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
 ##
 @@ -312,6 +381,60 @@ private static void generateRandomDirs(File dir, int 
numFiles, int numDirs, int
}
}
 
+   /**
+* Generate some files in the directory {@code dir}.
+* @param dir the directory where the files are generated
+* @return Tuple3 holding the generated files' absolute path, relative 
to the working directory path and relative
+* url.
+* @throws IOException if I/O error occurs while generating the files
+*/
+   public static Tuple3, Collection, 
Collection> prepareTestFiles(
+   final java.nio.file.Path dir) throws IOException {
+
+   Tuple3, Collection, Collection> 
result = new Tuple3<>();
+
+   result.f0 = generateSomeFilesInDirectoryReturnJarFiles(dir);
+   result.f1 = toRelativeFiles(result.f0);
+   result.f2 = toRelativeURLs(result.f1);
+
+   return result;
+   }
+
+   private static Collection 
generateSomeFilesInDirectoryReturnJarFiles(
+   final java.nio.file.Path dir) throws IOException {
+
+   final java.nio.file.Path jobSubDir1 = 
Files.createDirectory(dir.resolve("_sub_dir1"));
+   final java.nio.file.Path jobSubDir2 = 
Files.createDirectory(dir.resolve("_sub_dir2"));
+   final java.nio.file.Path jarFile1 = 
Files.createFile(dir.resolve("file1.jar"));
+   final java.nio.file.Path jarFile2 = 
Files.createFile(dir.resolve("file2.jar"));
+   final java.nio.file.Path jarFile3 = 
Files.createFile(jobSubDir1.resolve("file3.jar"));
+   final java.nio.file.Path jarFile4 = 
Files.createFile(jobSubDir2.resolve("file4.jar"));
+   final Collection jarFiles = new ArrayList<>();
+
+   Files.createFile(dir.resolve("file1.txt"));
+   Files.createFile(jobSubDir2.resolve("file2.txt"));
+
+   jarFiles.add(jarFile1.toFile());
+   jarFiles.add(jarFile2.toFile());
+   jarFiles.add(jarFile3.toFile());
+   jarFiles.add(jarFile4.toFile());
+   return jarFiles;
+   }
+
+   private static Collection toRelativeFiles(Collection files) 
{
+   final java.nio.file.Path workingDir = 
Paths.get(System.getProperty("user.dir"));
+   final Collection relativeFiles = new ArrayList<>();
+   files.forEach(file -> 
relativeFiles.add(workingDir.relativize(file.toPath()).toFile()));
+   return relativeFiles;
+   }
+
+   private static Collection toRelativeURLs(Collection 
relativeFiles) throws MalformedURLException {
+   final Collection relativeURLs = new ArrayList<>();
+   final URL context = new 
URL(relativeFiles.iterator().next().toURI().getScheme() + ":");
+   relativeFiles.forEach(FunctionUtils.uncheckedConsumer(file -> 
relativeURLs.add(new URL(context, file.toString();
 
 Review comment:
   maybe file.toString() -> file.getPath()?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-27 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r339385051
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java
 ##
 @@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.util.FileUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ *  Abstract class for the JobGraphRetriever, which wants to get classpath 
user's code depends on.
+ */
+public abstract class AbstractUserClassPathJobGraphRetriever implements 
JobGraphRetriever {
+
+   /* A collection of relative jar paths to the working directory */
+   private final List userClassPaths;
+
+   protected AbstractUserClassPathJobGraphRetriever(@Nullable final File 
jobDir) throws IOException {
+   if (jobDir == null) {
+   userClassPaths = Collections.emptyList();
+   } else {
+   final Collection jarFiles = 
FileUtils.listFilesInPath(jobDir, file -> file.getName().endsWith(".jar"));
+   final Collection relativeFiles = 
FileUtils.relativizeToWorkingDir(jarFiles);
+   this.userClassPaths = new 
ArrayList<>(FileUtils.toRelativeURLs(relativeFiles));
 
 Review comment:
   Or do we need a specified rule to order the classpaths if we'd like it to be 
List?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-25 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r339283903
 
 

 ##
 File path: flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
 ##
 @@ -259,6 +267,67 @@ public void testCompression() throws IOException {
assertDirEquals(compressDir.resolve(originalDir), 
extractDir.resolve(originalDir));
}
 
+   @Test
+   public void testListFilesInPathWithoutAnyFileReturnEmptyList() throws 
IOException {
+   final java.nio.file.Path testDir = 
tmp.newFolder("_test_0").toPath();
+
+   assertTrue(Collections.emptyList() ==
+   FileUtils.listFilesInPath(testDir.toFile(), f -> 
f.getName().endsWith(".jar")));
+   }
+
+   @Test
+   public void testListFilesInPath() throws IOException {
+   final java.nio.file.Path testDir = 
tmp.newFolder("_test_1").toPath();
+   final Tuple3, Collection, 
Collection> result = prepareTestFiles(testDir);
 
 Review comment:
   I think it's better to let `prepareTestFiles` only return `Collection`.
   Each test case can convert it to its expected result with `toRelativeFiles` 
or `toRelativeURLs`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-25 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r339283582
 
 

 ##
 File path: flink-core/src/main/java/org/apache/flink/util/FileUtils.java
 ##
 @@ -525,6 +540,102 @@ public static Path expandDirectory(Path file, Path 
targetDirectory) throws IOExc
return new Path(targetDirectory, rootDir);
}
 
+   /**
+* List the directory {@code directory} recursively and return the 
files that satisfies the {@code fileFilter}.
+*
+* @param directory the directory to be listed
+* @param fileFilter a file filter
+* @return a collection of {@code File}s
+*
+* @throws IOException if an I/O error occurs while listing the files 
in the given directory
+*/
+   public static Collection listFilesInPath(@Nonnull final File 
directory, @Nonnull final Predicate fileFilter) throws IOException {
+   if (!Files.exists(directory.toPath())) {
+   throw new IllegalArgumentException(String.format("The 
directory %s dose not exist.", directory));
+   }
+   if (!Files.isDirectory(directory.toPath())) {
+   throw new IllegalArgumentException(String.format("The 
%s is not a directory.", directory));
+   }
+
+   final FilterFileVisitor filterFileVisitor = new 
FilterFileVisitor(fileFilter);
+
+   Files.walkFileTree(
+   directory.toPath(),
+   EnumSet.of(FileVisitOption.FOLLOW_LINKS),
+   Integer.MAX_VALUE,
+   filterFileVisitor
+   );
+   return filterFileVisitor.getFiles().isEmpty() ? 
Collections.emptyList() : filterFileVisitor.getFiles();
+   }
+
+   /**
+* Convert a collection of {@code File}s to a collection of relative 
path to the working dir.
+*
+* @param files a collection of files needed to be relatived
+* @return a collection of relative {@code File}s
+*/
+   public static Collection relativizeToWorkingDir(@Nonnull final 
Collection files) {
+   final java.nio.file.Path workingDirPath = 
Paths.get(System.getProperty("user.dir"));
+
+   final List relativeFiles = new LinkedList<>();
+
+   for (File file : files) {
+   if (file.isAbsolute()) {
+   
relativeFiles.add(workingDirPath.relativize(file.toPath()).toFile());
+   } else {
+   relativeFiles.add(file);
+   }
+   }
+   return relativeFiles.isEmpty() ? Collections.emptyList() : 
relativeFiles;
+   }
+
+   /**
+* Convert a collection of relative {@code File}s to a collection of 
relative {@code URL}s.
+*
+* @param relativeFiles a collection of relative {@code File}s
+* @return a collection of relative URLs
+*
+* @throws MalformedURLException if error occurs while construct a url.
+*/
+   public static Collection toRelativeURLs(@Nonnull final 
Collection relativeFiles) throws MalformedURLException {
+   final List urls = new LinkedList<>();
+
+   for (File file : relativeFiles) {
+   urls.add(
+   new URL(
+   new URL(file.toURI().getScheme() + ":"),
+   file.toString()
 
 Review comment:
   maybe `file.toString()` -> `file.getPath()` so we do not need to understand 
how `File.getString()` works.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-25 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r339283383
 
 

 ##
 File path: flink-core/src/main/java/org/apache/flink/util/FileUtils.java
 ##
 @@ -525,6 +540,102 @@ public static Path expandDirectory(Path file, Path 
targetDirectory) throws IOExc
return new Path(targetDirectory, rootDir);
}
 
+   /**
+* List the directory {@code directory} recursively and return the 
files that satisfies the {@code fileFilter}.
+*
+* @param directory the directory to be listed
+* @param fileFilter a file filter
+* @return a collection of {@code File}s
+*
+* @throws IOException if an I/O error occurs while listing the files 
in the given directory
+*/
+   public static Collection listFilesInPath(@Nonnull final File 
directory, @Nonnull final Predicate fileFilter) throws IOException {
+   if (!Files.exists(directory.toPath())) {
+   throw new IllegalArgumentException(String.format("The 
directory %s dose not exist.", directory));
+   }
+   if (!Files.isDirectory(directory.toPath())) {
+   throw new IllegalArgumentException(String.format("The 
%s is not a directory.", directory));
+   }
+
+   final FilterFileVisitor filterFileVisitor = new 
FilterFileVisitor(fileFilter);
+
+   Files.walkFileTree(
+   directory.toPath(),
+   EnumSet.of(FileVisitOption.FOLLOW_LINKS),
+   Integer.MAX_VALUE,
+   filterFileVisitor
+   );
+   return filterFileVisitor.getFiles().isEmpty() ? 
Collections.emptyList() : filterFileVisitor.getFiles();
+   }
+
+   /**
+* Convert a collection of {@code File}s to a collection of relative 
path to the working dir.
+*
+* @param files a collection of files needed to be relatived
+* @return a collection of relative {@code File}s
+*/
+   public static Collection relativizeToWorkingDir(@Nonnull final 
Collection files) {
 
 Review comment:
   I think `@Nonnull` is not needed but a `checkNotNull` is needed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-25 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r339284116
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java
 ##
 @@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.util.FileUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ *  Abstract class for the JobGraphRetriever, which wants to get classpath 
user's code depends on.
+ */
+public abstract class AbstractUserClassPathJobGraphRetriever implements 
JobGraphRetriever {
+
+   /* A collection of relative jar paths to the working directory */
 
 Review comment:
   ```suggestion
/** User classpaths in relative form to the working directory. */
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-25 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r339283797
 
 

 ##
 File path: flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
 ##
 @@ -259,6 +267,67 @@ public void testCompression() throws IOException {
assertDirEquals(compressDir.resolve(originalDir), 
extractDir.resolve(originalDir));
}
 
+   @Test
+   public void testListFilesInPathWithoutAnyFileReturnEmptyList() throws 
IOException {
+   final java.nio.file.Path testDir = 
tmp.newFolder("_test_0").toPath();
+
+   assertTrue(Collections.emptyList() ==
+   FileUtils.listFilesInPath(testDir.toFile(), f -> 
f.getName().endsWith(".jar")));
 
 Review comment:
   ```suggestion

assertTrue(assertTrue(FileUtils.listFilesInPath(testDir.toFile(), f -> 
f.getName().endsWith(".jar")).isEmpty());
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-25 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r339284095
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java
 ##
 @@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.util.FileUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ *  Abstract class for the JobGraphRetriever, which wants to get classpath 
user's code depends on.
 
 Review comment:
   ```suggestion
*  Abstract class for the JobGraphRetriever which supports getting user 
classpaths.
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-25 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r339283412
 
 

 ##
 File path: flink-core/src/main/java/org/apache/flink/util/FileUtils.java
 ##
 @@ -525,6 +540,102 @@ public static Path expandDirectory(Path file, Path 
targetDirectory) throws IOExc
return new Path(targetDirectory, rootDir);
}
 
+   /**
+* List the directory {@code directory} recursively and return the 
files that satisfies the {@code fileFilter}.
+*
+* @param directory the directory to be listed
+* @param fileFilter a file filter
+* @return a collection of {@code File}s
+*
+* @throws IOException if an I/O error occurs while listing the files 
in the given directory
+*/
+   public static Collection listFilesInPath(@Nonnull final File 
directory, @Nonnull final Predicate fileFilter) throws IOException {
+   if (!Files.exists(directory.toPath())) {
+   throw new IllegalArgumentException(String.format("The 
directory %s dose not exist.", directory));
+   }
+   if (!Files.isDirectory(directory.toPath())) {
+   throw new IllegalArgumentException(String.format("The 
%s is not a directory.", directory));
+   }
+
+   final FilterFileVisitor filterFileVisitor = new 
FilterFileVisitor(fileFilter);
+
+   Files.walkFileTree(
+   directory.toPath(),
+   EnumSet.of(FileVisitOption.FOLLOW_LINKS),
+   Integer.MAX_VALUE,
+   filterFileVisitor
+   );
+   return filterFileVisitor.getFiles().isEmpty() ? 
Collections.emptyList() : filterFileVisitor.getFiles();
+   }
+
+   /**
+* Convert a collection of {@code File}s to a collection of relative 
path to the working dir.
+*
+* @param files a collection of files needed to be relatived
+* @return a collection of relative {@code File}s
+*/
+   public static Collection relativizeToWorkingDir(@Nonnull final 
Collection files) {
+   final java.nio.file.Path workingDirPath = 
Paths.get(System.getProperty("user.dir"));
+
+   final List relativeFiles = new LinkedList<>();
+
+   for (File file : files) {
+   if (file.isAbsolute()) {
+   
relativeFiles.add(workingDirPath.relativize(file.toPath()).toFile());
+   } else {
+   relativeFiles.add(file);
+   }
+   }
+   return relativeFiles.isEmpty() ? Collections.emptyList() : 
relativeFiles;
 
 Review comment:
   ```suggestion
return relativeFiles;
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-25 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r339283480
 
 

 ##
 File path: flink-core/src/main/java/org/apache/flink/util/FileUtils.java
 ##
 @@ -525,6 +540,102 @@ public static Path expandDirectory(Path file, Path 
targetDirectory) throws IOExc
return new Path(targetDirectory, rootDir);
}
 
+   /**
+* List the directory {@code directory} recursively and return the 
files that satisfies the {@code fileFilter}.
+*
+* @param directory the directory to be listed
+* @param fileFilter a file filter
+* @return a collection of {@code File}s
+*
+* @throws IOException if an I/O error occurs while listing the files 
in the given directory
+*/
+   public static Collection listFilesInPath(@Nonnull final File 
directory, @Nonnull final Predicate fileFilter) throws IOException {
+   if (!Files.exists(directory.toPath())) {
+   throw new IllegalArgumentException(String.format("The 
directory %s dose not exist.", directory));
+   }
+   if (!Files.isDirectory(directory.toPath())) {
+   throw new IllegalArgumentException(String.format("The 
%s is not a directory.", directory));
+   }
+
+   final FilterFileVisitor filterFileVisitor = new 
FilterFileVisitor(fileFilter);
+
+   Files.walkFileTree(
+   directory.toPath(),
+   EnumSet.of(FileVisitOption.FOLLOW_LINKS),
+   Integer.MAX_VALUE,
+   filterFileVisitor
+   );
+   return filterFileVisitor.getFiles().isEmpty() ? 
Collections.emptyList() : filterFileVisitor.getFiles();
+   }
+
+   /**
+* Convert a collection of {@code File}s to a collection of relative 
path to the working dir.
+*
+* @param files a collection of files needed to be relatived
+* @return a collection of relative {@code File}s
+*/
+   public static Collection relativizeToWorkingDir(@Nonnull final 
Collection files) {
+   final java.nio.file.Path workingDirPath = 
Paths.get(System.getProperty("user.dir"));
+
+   final List relativeFiles = new LinkedList<>();
+
+   for (File file : files) {
+   if (file.isAbsolute()) {
+   
relativeFiles.add(workingDirPath.relativize(file.toPath()).toFile());
+   } else {
+   relativeFiles.add(file);
+   }
+   }
+   return relativeFiles.isEmpty() ? Collections.emptyList() : 
relativeFiles;
+   }
+
+   /**
+* Convert a collection of relative {@code File}s to a collection of 
relative {@code URL}s.
+*
+* @param relativeFiles a collection of relative {@code File}s
+* @return a collection of relative URLs
+*
+* @throws MalformedURLException if error occurs while construct a url.
+*/
+   public static Collection toRelativeURLs(@Nonnull final 
Collection relativeFiles) throws MalformedURLException {
+   final List urls = new LinkedList<>();
+
+   for (File file : relativeFiles) {
+   urls.add(
 
 Review comment:
   We need to check whether it's really a relative File.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-25 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r339283446
 
 

 ##
 File path: flink-core/src/main/java/org/apache/flink/util/FileUtils.java
 ##
 @@ -525,6 +540,102 @@ public static Path expandDirectory(Path file, Path 
targetDirectory) throws IOExc
return new Path(targetDirectory, rootDir);
}
 
+   /**
+* List the directory {@code directory} recursively and return the 
files that satisfies the {@code fileFilter}.
+*
+* @param directory the directory to be listed
+* @param fileFilter a file filter
+* @return a collection of {@code File}s
+*
+* @throws IOException if an I/O error occurs while listing the files 
in the given directory
+*/
+   public static Collection listFilesInPath(@Nonnull final File 
directory, @Nonnull final Predicate fileFilter) throws IOException {
+   if (!Files.exists(directory.toPath())) {
+   throw new IllegalArgumentException(String.format("The 
directory %s dose not exist.", directory));
+   }
+   if (!Files.isDirectory(directory.toPath())) {
+   throw new IllegalArgumentException(String.format("The 
%s is not a directory.", directory));
+   }
+
+   final FilterFileVisitor filterFileVisitor = new 
FilterFileVisitor(fileFilter);
+
+   Files.walkFileTree(
+   directory.toPath(),
+   EnumSet.of(FileVisitOption.FOLLOW_LINKS),
+   Integer.MAX_VALUE,
+   filterFileVisitor
+   );
+   return filterFileVisitor.getFiles().isEmpty() ? 
Collections.emptyList() : filterFileVisitor.getFiles();
+   }
+
+   /**
+* Convert a collection of {@code File}s to a collection of relative 
path to the working dir.
+*
+* @param files a collection of files needed to be relatived
+* @return a collection of relative {@code File}s
+*/
+   public static Collection relativizeToWorkingDir(@Nonnull final 
Collection files) {
+   final java.nio.file.Path workingDirPath = 
Paths.get(System.getProperty("user.dir"));
+
+   final List relativeFiles = new LinkedList<>();
+
+   for (File file : files) {
+   if (file.isAbsolute()) {
+   
relativeFiles.add(workingDirPath.relativize(file.toPath()).toFile());
+   } else {
+   relativeFiles.add(file);
+   }
+   }
+   return relativeFiles.isEmpty() ? Collections.emptyList() : 
relativeFiles;
+   }
+
+   /**
+* Convert a collection of relative {@code File}s to a collection of 
relative {@code URL}s.
+*
+* @param relativeFiles a collection of relative {@code File}s
+* @return a collection of relative URLs
+*
+* @throws MalformedURLException if error occurs while construct a url.
+*/
+   public static Collection toRelativeURLs(@Nonnull final 
Collection relativeFiles) throws MalformedURLException {
 
 Review comment:
   I think @Nonnull is not needed but a checkNotNull is needed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-25 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r339283327
 
 

 ##
 File path: flink-core/src/main/java/org/apache/flink/util/FileUtils.java
 ##
 @@ -525,6 +540,102 @@ public static Path expandDirectory(Path file, Path 
targetDirectory) throws IOExc
return new Path(targetDirectory, rootDir);
}
 
+   /**
+* List the directory {@code directory} recursively and return the 
files that satisfies the {@code fileFilter}.
 
 Review comment:
   ```suggestion
 * List the {@code directory} recursively and return the files that 
satisfy the {@code fileFilter}.
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-25 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r339283490
 
 

 ##
 File path: flink-core/src/main/java/org/apache/flink/util/FileUtils.java
 ##
 @@ -525,6 +540,102 @@ public static Path expandDirectory(Path file, Path 
targetDirectory) throws IOExc
return new Path(targetDirectory, rootDir);
}
 
+   /**
+* List the directory {@code directory} recursively and return the 
files that satisfies the {@code fileFilter}.
+*
+* @param directory the directory to be listed
+* @param fileFilter a file filter
+* @return a collection of {@code File}s
+*
+* @throws IOException if an I/O error occurs while listing the files 
in the given directory
+*/
+   public static Collection listFilesInPath(@Nonnull final File 
directory, @Nonnull final Predicate fileFilter) throws IOException {
+   if (!Files.exists(directory.toPath())) {
+   throw new IllegalArgumentException(String.format("The 
directory %s dose not exist.", directory));
+   }
+   if (!Files.isDirectory(directory.toPath())) {
+   throw new IllegalArgumentException(String.format("The 
%s is not a directory.", directory));
+   }
+
+   final FilterFileVisitor filterFileVisitor = new 
FilterFileVisitor(fileFilter);
+
+   Files.walkFileTree(
+   directory.toPath(),
+   EnumSet.of(FileVisitOption.FOLLOW_LINKS),
+   Integer.MAX_VALUE,
+   filterFileVisitor
+   );
+   return filterFileVisitor.getFiles().isEmpty() ? 
Collections.emptyList() : filterFileVisitor.getFiles();
+   }
+
+   /**
+* Convert a collection of {@code File}s to a collection of relative 
path to the working dir.
+*
+* @param files a collection of files needed to be relatived
+* @return a collection of relative {@code File}s
+*/
+   public static Collection relativizeToWorkingDir(@Nonnull final 
Collection files) {
+   final java.nio.file.Path workingDirPath = 
Paths.get(System.getProperty("user.dir"));
+
+   final List relativeFiles = new LinkedList<>();
+
+   for (File file : files) {
+   if (file.isAbsolute()) {
+   
relativeFiles.add(workingDirPath.relativize(file.toPath()).toFile());
+   } else {
+   relativeFiles.add(file);
+   }
+   }
+   return relativeFiles.isEmpty() ? Collections.emptyList() : 
relativeFiles;
+   }
+
+   /**
+* Convert a collection of relative {@code File}s to a collection of 
relative {@code URL}s.
+*
+* @param relativeFiles a collection of relative {@code File}s
+* @return a collection of relative URLs
+*
+* @throws MalformedURLException if error occurs while construct a url.
+*/
+   public static Collection toRelativeURLs(@Nonnull final 
Collection relativeFiles) throws MalformedURLException {
+   final List urls = new LinkedList<>();
+
+   for (File file : relativeFiles) {
+   urls.add(
+   new URL(
+   new URL(file.toURI().getScheme() + ":"),
+   file.toString()
+   )
+   );
+   }
+   return urls.isEmpty() ? Collections.emptyList() : urls;
 
 Review comment:
   ```suggestion
return urls;
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-25 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r339283968
 
 

 ##
 File path: flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
 ##
 @@ -259,6 +267,67 @@ public void testCompression() throws IOException {
assertDirEquals(compressDir.resolve(originalDir), 
extractDir.resolve(originalDir));
}
 
+   @Test
+   public void testListFilesInPathWithoutAnyFileReturnEmptyList() throws 
IOException {
+   final java.nio.file.Path testDir = 
tmp.newFolder("_test_0").toPath();
+
+   assertTrue(Collections.emptyList() ==
+   FileUtils.listFilesInPath(testDir.toFile(), f -> 
f.getName().endsWith(".jar")));
+   }
+
+   @Test
+   public void testListFilesInPath() throws IOException {
+   final java.nio.file.Path testDir = 
tmp.newFolder("_test_1").toPath();
+   final Tuple3, Collection, 
Collection> result = prepareTestFiles(testDir);
+
+   assertTrue(CollectionUtils.isEqualCollection(result.f0,
+   FileUtils.listFilesInPath(testDir.toFile(), f ->  
f.getName().endsWith(".jar";
+   }
+
+   @Test
+   public void testRelativizeToWorkingDir() throws IOException {
+   final java.nio.file.Path testDir = 
tmp.newFolder("_test_2").toPath();
+   final Tuple3, Collection, 
Collection> result = prepareTestFiles(testDir);
+   final Collection relativeFiles = 
FileUtils.relativizeToWorkingDir(result.f0);
+   relativeFiles.forEach(file -> assertFalse(file.isAbsolute()));
+   assertTrue(
+   CollectionUtils.isEqualCollection(
+   result.f1,
+   FileUtils.relativizeToWorkingDir(result.f0)
+   )
+   );
+   }
+
+   @Test
+   public void testToRelativeURLs() throws IOException {
+   final java.nio.file.Path testDir = 
tmp.newFolder("_test_3").toPath();
+   final Tuple3, Collection, 
Collection> result = prepareTestFiles(testDir);
+
+   final Collection relativeURLs = 
FileUtils.toRelativeURLs(result.f1);
+   relativeURLs.forEach(url -> assertFalse(new 
File(url.getPath()).isAbsolute()));
+
+   assertTrue(
+   CollectionUtils.isEqualCollection(
+   result.f2,
+   relativeURLs
+   )
+   );
+   }
+
+   @Test(expected = IllegalArgumentException.class)
+   public void testListDirFailsIfDirectoryDoesNotExist() throws 
IOException {
+   final String fileName = "_does_not_exists_file";
+   final String doesNotExistsFilePath = tmp.getRoot() + "/" + 
fileName;
+
+   FileUtils.listFilesInPath(new File(doesNotExistsFilePath), f -> 
 f.getName().endsWith(".jar"));
+   }
+
+   @Test(expected = IllegalArgumentException.class)
+   public void testListAFileFailsBecauseDirectoryIsExpected() throws 
IOException {
 
 Review comment:
   maybe testListAFileFailsBecauseDirectoryIsExpected -> 
testListDirFailsIfParamIsNotDirectory


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-25 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r339284289
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetrieverTest.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FileUtilsTest;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link AbstractUserClassPathJobGraphRetriever}.
+ */
+public class AbstractUserClassPathJobGraphRetrieverTest extends TestLogger {
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private static class TestJobGraphRetriever extends 
AbstractUserClassPathJobGraphRetriever {
+   public TestJobGraphRetriever(@Nonnull final File jobDir) throws 
IOException {
+   super(jobDir);
+   }
+
+   @Override
+   public JobGraph retrieveJobGraph(Configuration configuration) {
+   return null;
+   }
+   }
+
+   @Test
+   public void testGetUserClassPath() throws IOException {
+   final Path testJobDir = 
temporaryFolder.newFolder("_test_job").toPath();
+   final Tuple3, Collection, 
Collection>
+   result = FileUtilsTest.prepareTestFiles(testJobDir);
+   final TestJobGraphRetriever testJobGraphRetriever = new 
TestJobGraphRetriever(testJobDir.toFile());
+   assertTrue(CollectionUtils.isEqualCollection(result.f2, 
testJobGraphRetriever.getUserClassPaths()));
+   }
+
+   @Test(expected = IllegalArgumentException.class)
+   public void 
testTheJobGraphRetrieverThrowExceptionBecauseJobDirDoesNotHaveAnyJars() throws 
IOException {
+   final Path testJobDir = 
temporaryFolder.newFolder("_test_job_").toPath();
+   new TestJobGraphRetriever(testJobDir.toFile());
+   }
+
+   @Test
+   public void testGetUserClassPathReturnEmptyListIfJobDirIsNull() throws 
IOException {
+   final TestJobGraphRetriever testJobGraphRetriever = new 
TestJobGraphRetriever(null);
+   assertTrue(Collections.emptyList() == 
testJobGraphRetriever.getUserClassPaths());
 
 Review comment:
   ```suggestion
assertTrue(testJobGraphRetriever.getUserClassPaths().isEmpty());
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-25 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r339284001
 
 

 ##
 File path: flink-core/src/test/java/org/apache/flink/util/FileUtilsTest.java
 ##
 @@ -312,6 +381,60 @@ private static void generateRandomDirs(File dir, int 
numFiles, int numDirs, int
}
}
 
+   /**
+* Generate some files in the directory {@code dir}.
+* @param dir the directory where the files are generated
+* @return Tuple3 holding the generated files' absolute path, relative 
to the working directory path and relative
+* url.
+* @throws IOException if I/O error occurs while generating the files
+*/
+   public static Tuple3, Collection, 
Collection> prepareTestFiles(
+   final java.nio.file.Path dir) throws IOException {
+
+   Tuple3, Collection, Collection> 
result = new Tuple3<>();
+
+   result.f0 = generateSomeFilesInDirectoryReturnJarFiles(dir);
+   result.f1 = toRelativeFiles(result.f0);
+   result.f2 = toRelativeURLs(result.f1);
+
+   return result;
+   }
+
+   private static Collection 
generateSomeFilesInDirectoryReturnJarFiles(
+   final java.nio.file.Path dir) throws IOException {
+
+   final java.nio.file.Path jobSubDir1 = 
Files.createDirectory(dir.resolve("_sub_dir1"));
+   final java.nio.file.Path jobSubDir2 = 
Files.createDirectory(dir.resolve("_sub_dir2"));
+   final java.nio.file.Path jarFile1 = 
Files.createFile(dir.resolve("file1.jar"));
+   final java.nio.file.Path jarFile2 = 
Files.createFile(dir.resolve("file2.jar"));
+   final java.nio.file.Path jarFile3 = 
Files.createFile(jobSubDir1.resolve("file3.jar"));
+   final java.nio.file.Path jarFile4 = 
Files.createFile(jobSubDir2.resolve("file4.jar"));
+   final Collection jarFiles = new ArrayList<>();
+
+   Files.createFile(dir.resolve("file1.txt"));
+   Files.createFile(jobSubDir2.resolve("file2.txt"));
+
+   jarFiles.add(jarFile1.toFile());
+   jarFiles.add(jarFile2.toFile());
+   jarFiles.add(jarFile3.toFile());
+   jarFiles.add(jarFile4.toFile());
+   return jarFiles;
+   }
+
+   private static Collection toRelativeFiles(Collection files) 
{
+   final java.nio.file.Path workingDir = 
Paths.get(System.getProperty("user.dir"));
+   final Collection relativeFiles = new ArrayList<>();
+   files.forEach(file -> 
relativeFiles.add(workingDir.relativize(file.toPath()).toFile()));
+   return relativeFiles;
+   }
+
+   private static Collection toRelativeURLs(Collection 
relativeFiles) throws MalformedURLException {
+   final Collection relativeURLs = new ArrayList<>();
+   final URL context = new 
URL(relativeFiles.iterator().next().toURI().getScheme() + ":");
+   relativeFiles.forEach(FunctionUtils.uncheckedConsumer(file -> 
relativeURLs.add(new URL(context, file.toString();
 
 Review comment:
   maybe file.toString() -> file.toPath()?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-25 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r339284193
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java
 ##
 @@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.util.FileUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ *  Abstract class for the JobGraphRetriever, which wants to get classpath 
user's code depends on.
+ */
+public abstract class AbstractUserClassPathJobGraphRetriever implements 
JobGraphRetriever {
+
+   /* A collection of relative jar paths to the working directory */
+   private final List userClassPaths;
+
+   protected AbstractUserClassPathJobGraphRetriever(@Nullable final File 
jobDir) throws IOException {
+   if (jobDir == null) {
+   userClassPaths = Collections.emptyList();
+   } else {
+   final Collection jarFiles = 
FileUtils.listFilesInPath(jobDir, file -> file.getName().endsWith(".jar"));
+   final Collection relativeFiles = 
FileUtils.relativizeToWorkingDir(jarFiles);
+   this.userClassPaths = new 
ArrayList<>(FileUtils.toRelativeURLs(relativeFiles));
 
 Review comment:
   Can we make `userClassPaths` to `Collection` so that we do not need the 
list conversion?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-25 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r339283097
 
 

 ##
 File path: flink-core/src/main/java/org/apache/flink/util/FileUtils.java
 ##
 @@ -525,6 +540,102 @@ public static Path expandDirectory(Path file, Path 
targetDirectory) throws IOExc
return new Path(targetDirectory, rootDir);
}
 
+   /**
+* List the directory {@code directory} recursively and return the 
files that satisfies the {@code fileFilter}.
+*
+* @param directory the directory to be listed
+* @param fileFilter a file filter
+* @return a collection of {@code File}s
+*
+* @throws IOException if an I/O error occurs while listing the files 
in the given directory
+*/
+   public static Collection listFilesInPath(@Nonnull final File 
directory, @Nonnull final Predicate fileFilter) throws IOException {
 
 Review comment:
   I think we do not need the @Nonnull annotation since a param is supposed to 
be non-null by default.
   Instead, we should do null checks before using these params.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-25 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r339283193
 
 

 ##
 File path: flink-core/src/main/java/org/apache/flink/util/FileUtils.java
 ##
 @@ -525,6 +540,102 @@ public static Path expandDirectory(Path file, Path 
targetDirectory) throws IOExc
return new Path(targetDirectory, rootDir);
}
 
+   /**
+* List the directory {@code directory} recursively and return the 
files that satisfies the {@code fileFilter}.
+*
+* @param directory the directory to be listed
+* @param fileFilter a file filter
+* @return a collection of {@code File}s
+*
+* @throws IOException if an I/O error occurs while listing the files 
in the given directory
+*/
+   public static Collection listFilesInPath(@Nonnull final File 
directory, @Nonnull final Predicate fileFilter) throws IOException {
+   if (!Files.exists(directory.toPath())) {
+   throw new IllegalArgumentException(String.format("The 
directory %s dose not exist.", directory));
+   }
+   if (!Files.isDirectory(directory.toPath())) {
+   throw new IllegalArgumentException(String.format("The 
%s is not a directory.", directory));
+   }
+
+   final FilterFileVisitor filterFileVisitor = new 
FilterFileVisitor(fileFilter);
+
+   Files.walkFileTree(
+   directory.toPath(),
+   EnumSet.of(FileVisitOption.FOLLOW_LINKS),
+   Integer.MAX_VALUE,
+   filterFileVisitor
+   );
+   return filterFileVisitor.getFiles().isEmpty() ? 
Collections.emptyList() : filterFileVisitor.getFiles();
 
 Review comment:
   ```suggestion
return filterFileVisitor.getFiles();
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-22 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r337468217
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java
 ##
 @@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.FileVisitOption;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ *  Abstract class for the JobGraphRetriever, which wants to get classpath 
user's code depends on.
+ */
+public abstract class AbstractUserClassPathJobGraphRetriever implements 
JobGraphRetriever {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractUserClassPathJobGraphRetriever.class);
 
 Review comment:
   It can be private since it is static now. And future subclass like 
`ClassPathJobGraphRetriever` has its own logger.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-22 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r337462709
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java
 ##
 @@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.FileVisitOption;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ *  Abstract class for the JobGraphRetriever, which wants to get classpath 
user's code depends on.
+ */
+public abstract class AbstractUserClassPathJobGraphRetriever implements 
JobGraphRetriever {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractUserClassPathJobGraphRetriever.class);
+
+   /** The directory contains all the jars, which user code depends on. */
+   @Nullable
+   private final String jobDir;
+
+   private List userClassPaths;
+
+   public AbstractUserClassPathJobGraphRetriever(String jobDir) {
+   this.jobDir = jobDir;
+   }
+
+   public List getUserClassPaths() throws IOException {
+   if (userClassPaths == null) {
+   userClassPaths = getRelativeJarsURLFromDir(jobDir);
+   }
+   return userClassPaths;
+   }
+
+   /**
+* Scan all the jar files in the {@code dir} and return all these jar 
files' relative URLs to "user.dir".
+* @param dir the dir needed to scan the jar files
+* @return the jar files' relative URLs
+* @throws IOException
+*/
+   private List getRelativeJarsURLFromDir(String dir) throws 
IOException {
+
+   if (dir == null) {
+   return Collections.emptyList();
+   }
+
+   final List jarURLs = new LinkedList<>();
+   if (!Files.exists(Paths.get(dir))) {
+   throw new IllegalArgumentException("the job dir " + dir 
+ " dose not exists.");
+   }
+   if (!Files.isDirectory(Paths.get(dir))) {
+   throw new IllegalArgumentException("the job dir " + dir 
+ " is not a directory.");
+   }
+
+   Path dirPath;
+   if (Paths.get(dir).isAbsolute()) {
+   dirPath = 
Paths.get(System.getProperty("user.dir")).relativize(Paths.get(dir));
+   } else {
+   dirPath = Paths.get(dir);
+   }
+   Files.walkFileTree(
+   dirPath,
+   EnumSet.of(FileVisitOption.FOLLOW_LINKS),
+   Integer.MAX_VALUE,
+   new SimpleFileVisitor() {
+
+   @Override
+   public FileVisitResult visitFile(java.nio.file.Path 
file, BasicFileAttributes attrs)
+   throws IOException {
+   FileVisitResult fileVisitResult = 
super.visitFile(file, attrs);
+   if 
(file.getFileName().toString().endsWith(".jar")) {
+   LOG.info("add " + file.toString() + " 
to user classpath");
+   jarURLs.add(
 
 Review comment:
   bad indentation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-22 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r337459475
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetrieverTest.java
 ##
 @@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link AbstractUserClassPathJobGraphRetriever}.
+ */
+public class AbstractUserClassPathJobGraphRetrieverTest extends TestLogger {
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* Test class.
 
 Review comment:
   It's better to remove this useless comment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-22 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r337458869
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java
 ##
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.FileVisitOption;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ *  Abstract class for the JobGraphRetriever, which wants to get classpath 
user's code depends on.
+ */
+
+public abstract class AbstractUserClassPathJobGraphRetriever implements 
JobGraphRetriever {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractUserClassPathJobGraphRetriever.class);
+
+   public static final String DEFAULT_JOB_DIR = "job";
+
+   /** The directory contains all the jars, which user code depends on. */
+   @Nullable
+   private final String jobDir;
+
+   private List userClassPaths;
+
+   public AbstractUserClassPathJobGraphRetriever(String jobDir) {
+   this.jobDir = jobDir;
+   }
+
+   public List getUserClassPaths() throws IOException {
+   if (userClassPaths == null) {
+   userClassPaths = scanJarsInJobClassDir(jobDir);
+   }
+   return userClassPaths;
+   }
+
+   private List scanJarsInJobClassDir(String dir) throws IOException {
+
+   if (dir == null) {
+   return Collections.emptyList();
+   }
+
+   final File dirFile = new File(new Path(dir).toString());
+   final List jarURLs = new LinkedList<>();
+
+   if (!dirFile.exists()) {
+   LOG.warn("the job dir " + dirFile + " dose not 
exists.");
+   return Collections.emptyList();
+   }
+   if (!dirFile.isDirectory()) {
+   LOG.warn("the job dir " + dirFile + " is not a 
directory.");
+   return Collections.emptyList();
+   }
+
+   Files.walkFileTree(dirFile.toPath(),
+   EnumSet.of(FileVisitOption.FOLLOW_LINKS),
+   Integer.MAX_VALUE,
+   new SimpleFileVisitor(){
+
+   @Override
+   public FileVisitResult visitFile(java.nio.file.Path 
file, BasicFileAttributes attrs)
+   throws IOException {
+   FileVisitResult fileVisitResult = 
super.visitFile(file, attrs);
+   if 
(file.getFileName().toString().endsWith(".jar")) {
+   LOG.info("add " + file.toString() + " 
to user classpath");
+   if (file.isAbsolute()) {
+   
jarURLs.add(file.toUri().toURL());
+   } else {
+   jarURLs.add(
+   new URL(new 
URL(file.getFileName().toUri().getScheme() + ":"), file.toString())
 
 Review comment:
   > MalformedURLException - if no protocol is specified, or an unknown 
protocol is found, or spec is null.
   
   Ok. Looks the main reason is the protocol needs to be specified to avoid 
`MalformedURLException`.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific co

[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-21 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r337329436
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java
 ##
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.FileVisitOption;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ *  Abstract class for the JobGraphRetriever, which wants to get classpath 
user's code depends on.
+ */
+
+public abstract class AbstractUserClassPathJobGraphRetriever implements 
JobGraphRetriever {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractUserClassPathJobGraphRetriever.class);
+
+   public static final String DEFAULT_JOB_DIR = "job";
 
 Review comment:
   This default value is used for both user jar deployment and scanning.
   I think it's better to move to a common place, like ConfigConstants.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-21 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r337329154
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java
 ##
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.FileVisitOption;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ *  Abstract class for the JobGraphRetriever, which wants to get classpath 
user's code depends on.
+ */
+
+public abstract class AbstractUserClassPathJobGraphRetriever implements 
JobGraphRetriever {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractUserClassPathJobGraphRetriever.class);
+
+   public static final String DEFAULT_JOB_DIR = "job";
+
+   /** The directory contains all the jars, which user code depends on. */
+   @Nullable
+   private final String jobDir;
+
+   private List userClassPaths;
+
+   public AbstractUserClassPathJobGraphRetriever(String jobDir) {
+   this.jobDir = jobDir;
+   }
+
+   public List getUserClassPaths() throws IOException {
+   if (userClassPaths == null) {
+   userClassPaths = scanJarsInJobClassDir(jobDir);
+   }
+   return userClassPaths;
+   }
+
+   private List scanJarsInJobClassDir(String dir) throws IOException {
+
+   if (dir == null) {
+   return Collections.emptyList();
+   }
+
+   final File dirFile = new File(new Path(dir).toString());
+   final List jarURLs = new LinkedList<>();
+
+   if (!dirFile.exists()) {
+   LOG.warn("the job dir " + dirFile + " dose not 
exists.");
+   return Collections.emptyList();
+   }
+   if (!dirFile.isDirectory()) {
+   LOG.warn("the job dir " + dirFile + " is not a 
directory.");
+   return Collections.emptyList();
+   }
+
+   Files.walkFileTree(dirFile.toPath(),
+   EnumSet.of(FileVisitOption.FOLLOW_LINKS),
+   Integer.MAX_VALUE,
+   new SimpleFileVisitor(){
+
+   @Override
+   public FileVisitResult visitFile(java.nio.file.Path 
file, BasicFileAttributes attrs)
+   throws IOException {
+   FileVisitResult fileVisitResult = 
super.visitFile(file, attrs);
+   if 
(file.getFileName().toString().endsWith(".jar")) {
+   LOG.info("add " + file.toString() + " 
to user classpath");
+   if (file.isAbsolute()) {
 
 Review comment:
   How about to make all URLs in relative format? 
   I think relative path could supersede absolute path in all class path usage 
cases.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-21 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r337133659
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java
 ##
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.FileVisitOption;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ *  Abstract class for the JobGraphRetriever, which wants to get classpath 
user's code depends on.
+ */
+
+public abstract class AbstractUserClassPathJobGraphRetriever implements 
JobGraphRetriever {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractUserClassPathJobGraphRetriever.class);
+
+   public static final String DEFAULT_JOB_DIR = "job";
+
+   /** The directory contains all the jars, which user code depends on. */
+   @Nullable
+   private final String jobDir;
+
+   private List userClassPaths;
+
+   public AbstractUserClassPathJobGraphRetriever(String jobDir) {
+   this.jobDir = jobDir;
+   }
+
+   public List getUserClassPaths() throws IOException {
+   if (userClassPaths == null) {
+   userClassPaths = scanJarsInJobClassDir(jobDir);
+   }
+   return userClassPaths;
+   }
+
+   private List scanJarsInJobClassDir(String dir) throws IOException {
+
+   if (dir == null) {
+   return Collections.emptyList();
+   }
+
+   final File dirFile = new File(new Path(dir).toString());
+   final List jarURLs = new LinkedList<>();
+
+   if (!dirFile.exists()) {
+   LOG.warn("the job dir " + dirFile + " dose not 
exists.");
+   return Collections.emptyList();
+   }
+   if (!dirFile.isDirectory()) {
+   LOG.warn("the job dir " + dirFile + " is not a 
directory.");
 
 Review comment:
   The same as above. Why not fail here if a bad job dir is specified?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-21 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r337114049
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetrieverTest.java
 ##
 @@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link AbstractUserClassPathJobGraphRetriever}.
+ */
+public class AbstractUserClassPathJobGraphRetrieverTest extends TestLogger {
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* Test class.
+*/
+   public static class TestJobGraphRetriever extends 
AbstractUserClassPathJobGraphRetriever {
+
+   public TestJobGraphRetriever(String jobDir) {
+   super(jobDir);
+   }
+
+   @Override
+   public JobGraph retrieveJobGraph(Configuration configuration) {
+   return null;
+   }
+   }
+
+   @Test
+   public void testGetUserClassPathFromAFile() throws IOException {
+   final String fileName = "a.jar";
+   File file = temporaryFolder.newFile(fileName);
+
+   TestJobGraphRetriever testJobGraphRetriever = new 
TestJobGraphRetriever(file.getAbsolutePath());
+
+   assertEquals(Collections.emptyList(), 
testJobGraphRetriever.getUserClassPaths());
+   }
+
+   @Test
+   public void testGetUserClassPathFormADoesNotExistsFile() throws 
IOException {
+   final String fileName = "_does_not_exists_file";
+   final String doesNotExistsFilePath = temporaryFolder.getRoot() 
+ "/" + fileName;
+
+   TestJobGraphRetriever testJobGraphRetriever = new 
TestJobGraphRetriever(doesNotExistsFilePath);
+
+   assertEquals(Collections.emptyList(), 
testJobGraphRetriever.getUserClassPaths());
+
+   }
+
+   @Test
+   public void testGetUserClassPath() throws IOException {
+
+   final Path jobDir = 
temporaryFolder.newFolder("_job_dir").toPath();
+   final Path jobSubDir1 = 
Files.createDirectory(jobDir.resolve("_sub_dir1"));
+   final Path jobSubDir2 = 
Files.createDirectory(jobDir.resolve("_sub_dir2"));
+   final Path jarFile1 = 
Files.createFile(jobDir.resolve("file1.jar"));
+   final Path jarFile2 = 
Files.createFile(jobDir.resolve("file2.jar"));
+   final Path jarFile3 = 
Files.createFile(jobSubDir1.resolve("file3.jar"));
+   final Path jarFile4 = 
Files.createFile(jobSubDir2.resolve("file4.jar"));
+
+   Files.createFile(jobDir.resolve("file1.txt"));
+   Files.createFile(jobSubDir2.resolve("file2.txt"));
+
+   List expectedUrls = Arrays.asList(jarFile1.toUri().toURL(),
 
 Review comment:
   It's better to state the first param in a new line, since other params are 
already in new lines.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-21 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r337127092
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java
 ##
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.FileVisitOption;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ *  Abstract class for the JobGraphRetriever, which wants to get classpath 
user's code depends on.
+ */
+
 
 Review comment:
   The empty line should be removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-21 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r337131201
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java
 ##
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.FileVisitOption;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ *  Abstract class for the JobGraphRetriever, which wants to get classpath 
user's code depends on.
+ */
+
+public abstract class AbstractUserClassPathJobGraphRetriever implements 
JobGraphRetriever {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractUserClassPathJobGraphRetriever.class);
+
+   public static final String DEFAULT_JOB_DIR = "job";
+
+   /** The directory contains all the jars, which user code depends on. */
+   @Nullable
+   private final String jobDir;
+
+   private List userClassPaths;
+
+   public AbstractUserClassPathJobGraphRetriever(String jobDir) {
+   this.jobDir = jobDir;
+   }
+
+   public List getUserClassPaths() throws IOException {
+   if (userClassPaths == null) {
+   userClassPaths = scanJarsInJobClassDir(jobDir);
+   }
+   return userClassPaths;
+   }
+
+   private List scanJarsInJobClassDir(String dir) throws IOException {
+
+   if (dir == null) {
+   return Collections.emptyList();
+   }
+
+   final File dirFile = new File(new Path(dir).toString());
+   final List jarURLs = new LinkedList<>();
+
+   if (!dirFile.exists()) {
+   LOG.warn("the job dir " + dirFile + " dose not 
exists.");
+   return Collections.emptyList();
+   }
+   if (!dirFile.isDirectory()) {
+   LOG.warn("the job dir " + dirFile + " is not a 
directory.");
+   return Collections.emptyList();
+   }
+
+   Files.walkFileTree(dirFile.toPath(),
+   EnumSet.of(FileVisitOption.FOLLOW_LINKS),
+   Integer.MAX_VALUE,
+   new SimpleFileVisitor(){
+
+   @Override
+   public FileVisitResult visitFile(java.nio.file.Path 
file, BasicFileAttributes attrs)
+   throws IOException {
+   FileVisitResult fileVisitResult = 
super.visitFile(file, attrs);
+   if 
(file.getFileName().toString().endsWith(".jar")) {
+   LOG.info("add " + file.toString() + " 
to user classpath");
+   if (file.isAbsolute()) {
+   
jarURLs.add(file.toUri().toURL());
+   } else {
+   jarURLs.add(
+   new URL(new 
URL(file.getFileName().toUri().getScheme() + ":"), file.toString())
+   );
+   }
+   }
+   return fileVisitResult;
+   }
+   });
+
+   if (jarURLs.isEmpty()) {
+   return Collections.emptyList();
+   } else {
+   return jarURLs;
 
 Review comment:
   Here's an unexpected extra indentati

[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-21 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r337129643
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java
 ##
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.FileVisitOption;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ *  Abstract class for the JobGraphRetriever, which wants to get classpath 
user's code depends on.
+ */
+
+public abstract class AbstractUserClassPathJobGraphRetriever implements 
JobGraphRetriever {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractUserClassPathJobGraphRetriever.class);
+
+   public static final String DEFAULT_JOB_DIR = "job";
+
+   /** The directory contains all the jars, which user code depends on. */
+   @Nullable
+   private final String jobDir;
+
+   private List userClassPaths;
+
+   public AbstractUserClassPathJobGraphRetriever(String jobDir) {
+   this.jobDir = jobDir;
+   }
+
+   public List getUserClassPaths() throws IOException {
+   if (userClassPaths == null) {
+   userClassPaths = scanJarsInJobClassDir(jobDir);
+   }
+   return userClassPaths;
+   }
+
+   private List scanJarsInJobClassDir(String dir) throws IOException {
+
+   if (dir == null) {
+   return Collections.emptyList();
+   }
+
+   final File dirFile = new File(new Path(dir).toString());
+   final List jarURLs = new LinkedList<>();
+
+   if (!dirFile.exists()) {
+   LOG.warn("the job dir " + dirFile + " dose not 
exists.");
+   return Collections.emptyList();
+   }
+   if (!dirFile.isDirectory()) {
+   LOG.warn("the job dir " + dirFile + " is not a 
directory.");
+   return Collections.emptyList();
+   }
+
+   Files.walkFileTree(dirFile.toPath(),
+   EnumSet.of(FileVisitOption.FOLLOW_LINKS),
+   Integer.MAX_VALUE,
+   new SimpleFileVisitor(){
+
+   @Override
 
 Review comment:
   Looks to me this block has bad indentations. Cut and paste the lines may fix 
it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-21 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r337132189
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetrieverTest.java
 ##
 @@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link AbstractUserClassPathJobGraphRetriever}.
+ */
+public class AbstractUserClassPathJobGraphRetrieverTest extends TestLogger {
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* Test class.
 
 Review comment:
   Maybe `Test class for {@link AbstractUserClassPathJobGraphRetriever}`.
   Or removing the comments and make this class private.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-21 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r337149230
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java
 ##
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.FileVisitOption;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ *  Abstract class for the JobGraphRetriever, which wants to get classpath 
user's code depends on.
+ */
+
+public abstract class AbstractUserClassPathJobGraphRetriever implements 
JobGraphRetriever {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractUserClassPathJobGraphRetriever.class);
+
+   public static final String DEFAULT_JOB_DIR = "job";
+
+   /** The directory contains all the jars, which user code depends on. */
+   @Nullable
+   private final String jobDir;
+
+   private List userClassPaths;
+
+   public AbstractUserClassPathJobGraphRetriever(String jobDir) {
+   this.jobDir = jobDir;
+   }
+
+   public List getUserClassPaths() throws IOException {
+   if (userClassPaths == null) {
+   userClassPaths = scanJarsInJobClassDir(jobDir);
+   }
+   return userClassPaths;
+   }
+
+   private List scanJarsInJobClassDir(String dir) throws IOException {
+
+   if (dir == null) {
+   return Collections.emptyList();
+   }
+
+   final File dirFile = new File(new Path(dir).toString());
+   final List jarURLs = new LinkedList<>();
+
+   if (!dirFile.exists()) {
+   LOG.warn("the job dir " + dirFile + " dose not 
exists.");
+   return Collections.emptyList();
+   }
+   if (!dirFile.isDirectory()) {
+   LOG.warn("the job dir " + dirFile + " is not a 
directory.");
+   return Collections.emptyList();
+   }
+
+   Files.walkFileTree(dirFile.toPath(),
+   EnumSet.of(FileVisitOption.FOLLOW_LINKS),
+   Integer.MAX_VALUE,
+   new SimpleFileVisitor(){
+
+   @Override
+   public FileVisitResult visitFile(java.nio.file.Path 
file, BasicFileAttributes attrs)
+   throws IOException {
+   FileVisitResult fileVisitResult = 
super.visitFile(file, attrs);
+   if 
(file.getFileName().toString().endsWith(".jar")) {
+   LOG.info("add " + file.toString() + " 
to user classpath");
+   if (file.isAbsolute()) {
+   
jarURLs.add(file.toUri().toURL());
+   } else {
+   jarURLs.add(
+   new URL(new 
URL(file.getFileName().toUri().getScheme() + ":"), file.toString())
 
 Review comment:
   Do you have a formal reference for that why a `:` 
could work?
   Could you have comments for what hope to generate here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.

[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-21 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r337126118
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java
 ##
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.FileVisitOption;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ *  Abstract class for the JobGraphRetriever, which wants to get classpath 
user's code depends on.
+ */
+
+public abstract class AbstractUserClassPathJobGraphRetriever implements 
JobGraphRetriever {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractUserClassPathJobGraphRetriever.class);
+
+   public static final String DEFAULT_JOB_DIR = "job";
+
+   /** The directory contains all the jars, which user code depends on. */
+   @Nullable
+   private final String jobDir;
+
+   private List userClassPaths;
+
+   public AbstractUserClassPathJobGraphRetriever(String jobDir) {
+   this.jobDir = jobDir;
+   }
+
+   public List getUserClassPaths() throws IOException {
+   if (userClassPaths == null) {
+   userClassPaths = scanJarsInJobClassDir(jobDir);
+   }
+   return userClassPaths;
+   }
+
+   private List scanJarsInJobClassDir(String dir) throws IOException {
+
+   if (dir == null) {
+   return Collections.emptyList();
+   }
+
+   final File dirFile = new File(new Path(dir).toString());
+   final List jarURLs = new LinkedList<>();
+
+   if (!dirFile.exists()) {
+   LOG.warn("the job dir " + dirFile + " dose not 
exists.");
+   return Collections.emptyList();
+   }
+   if (!dirFile.isDirectory()) {
+   LOG.warn("the job dir " + dirFile + " is not a 
directory.");
+   return Collections.emptyList();
+   }
+
+   Files.walkFileTree(dirFile.toPath(),
+   EnumSet.of(FileVisitOption.FOLLOW_LINKS),
+   Integer.MAX_VALUE,
+   new SimpleFileVisitor(){
 
 Review comment:
   Missing a space in front of "{".


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-21 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r337126350
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java
 ##
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.FileVisitOption;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ *  Abstract class for the JobGraphRetriever, which wants to get classpath 
user's code depends on.
+ */
+
+public abstract class AbstractUserClassPathJobGraphRetriever implements 
JobGraphRetriever {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractUserClassPathJobGraphRetriever.class);
+
+   public static final String DEFAULT_JOB_DIR = "job";
+
+   /** The directory contains all the jars, which user code depends on. */
+   @Nullable
+   private final String jobDir;
+
+   private List userClassPaths;
+
+   public AbstractUserClassPathJobGraphRetriever(String jobDir) {
+   this.jobDir = jobDir;
+   }
+
+   public List getUserClassPaths() throws IOException {
+   if (userClassPaths == null) {
+   userClassPaths = scanJarsInJobClassDir(jobDir);
+   }
+   return userClassPaths;
+   }
+
+   private List scanJarsInJobClassDir(String dir) throws IOException {
+
+   if (dir == null) {
+   return Collections.emptyList();
+   }
+
+   final File dirFile = new File(new Path(dir).toString());
+   final List jarURLs = new LinkedList<>();
+
+   if (!dirFile.exists()) {
+   LOG.warn("the job dir " + dirFile + " dose not 
exists.");
+   return Collections.emptyList();
+   }
+   if (!dirFile.isDirectory()) {
+   LOG.warn("the job dir " + dirFile + " is not a 
directory.");
+   return Collections.emptyList();
+   }
+
+   Files.walkFileTree(dirFile.toPath(),
 
 Review comment:
   It's better to move the first param to a new line.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-21 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r337125149
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java
 ##
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.FileVisitOption;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ *  Abstract class for the JobGraphRetriever, which wants to get classpath 
user's code depends on.
+ */
+
+public abstract class AbstractUserClassPathJobGraphRetriever implements 
JobGraphRetriever {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractUserClassPathJobGraphRetriever.class);
+
+   public static final String DEFAULT_JOB_DIR = "job";
+
+   /** The directory contains all the jars, which user code depends on. */
+   @Nullable
+   private final String jobDir;
+
+   private List userClassPaths;
+
+   public AbstractUserClassPathJobGraphRetriever(String jobDir) {
+   this.jobDir = jobDir;
+   }
+
+   public List getUserClassPaths() throws IOException {
+   if (userClassPaths == null) {
+   userClassPaths = scanJarsInJobClassDir(jobDir);
+   }
+   return userClassPaths;
+   }
+
+   private List scanJarsInJobClassDir(String dir) throws IOException {
+
+   if (dir == null) {
+   return Collections.emptyList();
+   }
+
+   final File dirFile = new File(new Path(dir).toString());
+   final List jarURLs = new LinkedList<>();
+
+   if (!dirFile.exists()) {
+   LOG.warn("the job dir " + dirFile + " dose not 
exists.");
+   return Collections.emptyList();
 
 Review comment:
   Why not fail here if a bad job dir is specified?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] Introduce the AbstractUserClassPathJobGraphRetriever

2019-10-21 Thread GitBox
zhuzhurk commented on a change in pull request #9950: [FLINK-14464][runtime] 
Introduce the AbstractUserClassPathJobGraphRetriever
URL: https://github.com/apache/flink/pull/9950#discussion_r337130384
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java
 ##
 @@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.FileVisitOption;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ *  Abstract class for the JobGraphRetriever, which wants to get classpath 
user's code depends on.
+ */
+
+public abstract class AbstractUserClassPathJobGraphRetriever implements 
JobGraphRetriever {
+
+   protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractUserClassPathJobGraphRetriever.class);
+
+   public static final String DEFAULT_JOB_DIR = "job";
+
+   /** The directory contains all the jars, which user code depends on. */
+   @Nullable
+   private final String jobDir;
+
+   private List userClassPaths;
+
+   public AbstractUserClassPathJobGraphRetriever(String jobDir) {
+   this.jobDir = jobDir;
+   }
+
+   public List getUserClassPaths() throws IOException {
+   if (userClassPaths == null) {
+   userClassPaths = scanJarsInJobClassDir(jobDir);
+   }
+   return userClassPaths;
+   }
+
+   private List scanJarsInJobClassDir(String dir) throws IOException {
+
+   if (dir == null) {
+   return Collections.emptyList();
+   }
+
+   final File dirFile = new File(new Path(dir).toString());
+   final List jarURLs = new LinkedList<>();
+
+   if (!dirFile.exists()) {
+   LOG.warn("the job dir " + dirFile + " dose not 
exists.");
+   return Collections.emptyList();
+   }
+   if (!dirFile.isDirectory()) {
+   LOG.warn("the job dir " + dirFile + " is not a 
directory.");
+   return Collections.emptyList();
+   }
+
+   Files.walkFileTree(dirFile.toPath(),
+   EnumSet.of(FileVisitOption.FOLLOW_LINKS),
+   Integer.MAX_VALUE,
+   new SimpleFileVisitor(){
+
+   @Override
+   public FileVisitResult visitFile(java.nio.file.Path 
file, BasicFileAttributes attrs)
+   throws IOException {
 
 Review comment:
   One more indentation is needed for the newlined `throws` to be distinguished 
from the method body.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services