[GitHub] [flink] zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] Using FlinkUserCodeClassLoaders to load the user class in the perjob mode

2019-09-28 Thread GitBox
zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] 
Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
URL: https://github.com/apache/flink/pull/9715#discussion_r329319129
 
 

 ##
 File path: 
flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
 ##
 @@ -110,12 +121,14 @@ public JobGraph retrieveJobGraph(Configuration 
configuration) throws FlinkExcept
}
}
 
-   private PackagedProgram createPackagedProgram() throws FlinkException {
+   private PackagedProgram createPackagedProgram() throws FlinkException, 
IOException {
final String entryClass = getJobClassNameOrScanClassPath();
try {
-   final Class mainClass = 
getClass().getClassLoader().loadClass(entryClass);
-   return new PackagedProgram(mainClass, programArguments);
-   } catch (ClassNotFoundException | ProgramInvocationException e) 
{
+   final String userClassPaths = getUserClassPaths().
+   
stream().map(URL::toString).collect(Collectors.joining(":"));
 
 Review comment:
   According to the [code style 
guide](https://flink.apache.org/contributing/code-style-and-quality-formatting.html#breaking-the-lines-of-too-long-statements),
 each call should be in a new line when breaking long statements.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] Using FlinkUserCodeClassLoaders to load the user class in the perjob mode

2019-09-28 Thread GitBox
zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] 
Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
URL: https://github.com/apache/flink/pull/9715#discussion_r329321484
 
 

 ##
 File path: 
flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
 ##
 @@ -55,6 +66,62 @@
 
private static final String[] PROGRAM_ARGUMENTS = {"--arg", "suffix"};
 
+   private static URL testJobForUserClassLoaderJarURL = null;
+   private static URL testJobDependedJarURL = null;
+
+   private Path userDirHasEntryClass;
+   private Path userJarHasEntryClass;
+   private Path userEntryClassDependJarAtUserDirHasEntryClass;
+   private Path textAtUserDirHasEntryClass;
+
+   private Path userDirHasNotEntryClass;
+   private Path userJarHasNotEntryClass;
+   private Path userEntryClassDependJarAtUserDirHasNotEntryClass;
+   private Path textAtUserDirHasNotEntryClass;
+
+
 
 Review comment:
   These duplicated empty lines can be removed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] Using FlinkUserCodeClassLoaders to load the user class in the perjob mode

2019-09-28 Thread GitBox
zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] 
Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
URL: https://github.com/apache/flink/pull/9715#discussion_r329320115
 
 

 ##
 File path: 
flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
 ##
 @@ -77,7 +81,9 @@ private StandaloneJobClusterEntryPoint(
protected DispatcherResourceManagerComponentFactory 
createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return new JobDispatcherResourceManagerComponentFactory(
StandaloneResourceManagerFactory.INSTANCE,
-   new ClassPathJobGraphRetriever(jobId, 
savepointRestoreSettings, programArguments, jobClassName));
+   new ClassPathJobGraphRetriever(jobId, 
savepointRestoreSettings, programArguments, jobClassName,
+   Paths.get(System.getenv(ENV_FLINK_HOME_DIR) == 
null ? DEFAULT_FLINK_HOME : System.getenv(ENV_FLINK_HOME_DIR),
 
 Review comment:
   Hard coding "/opt/flink" looks a bit tricky. 
   Is it possible to we set "FLINK_HOME" in all cases? So that we can throw 
exception here for unexpected cases.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] Using FlinkUserCodeClassLoaders to load the user class in the perjob mode

2019-09-28 Thread GitBox
zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] 
Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
URL: https://github.com/apache/flink/pull/9715#discussion_r329318669
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.FileVisitOption;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ *  Abstract class for the JobGraphRetriever, which wants to get classpath 
user's code depends on.
+ */
+
+public abstract class AbstractUserClassPathJobGraphRetriever implements 
JobGraphRetriever {
+
+   private final Logger log = LoggerFactory.getLogger(getClass());
 
 Review comment:
   Or maybe we can make it a private static logger for itself.
   And then we can make `scanJarsInJobClassDir` static.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] Using FlinkUserCodeClassLoaders to load the user class in the perjob mode

2019-09-28 Thread GitBox
zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] 
Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
URL: https://github.com/apache/flink/pull/9715#discussion_r329314051
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetrieverTest.java
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link AbstractUserClassPathJobGraphRetriever}.
+ */
+public class AbstractUserClassPathJobGraphRetrieverTest {
+
+   @Rule
+   public final TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   /**
+* Test class.
+*/
+   public static class TestJobGraphRetrieverTest extends 
AbstractUserClassPathJobGraphRetriever {
 
 Review comment:
   I guess the name should be `TestJobGraphRetriever`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] Using FlinkUserCodeClassLoaders to load the user class in the perjob mode

2019-09-28 Thread GitBox
zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] 
Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
URL: https://github.com/apache/flink/pull/9715#discussion_r329317178
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.FileVisitOption;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ *  Abstract class for the JobGraphRetriever, which wants to get classpath 
user's code depends on.
+ */
+
+public abstract class AbstractUserClassPathJobGraphRetriever implements 
JobGraphRetriever {
+
+   private final Logger log = LoggerFactory.getLogger(getClass());
 
 Review comment:
   I think the logger should be in protected scope. And 
`AbstractUserClassPathJobGraphRetriever` subclasses should use it rather than 
creating loggers by themselves.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] Using FlinkUserCodeClassLoaders to load the user class in the perjob mode

2019-09-28 Thread GitBox
zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] 
Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
URL: https://github.com/apache/flink/pull/9715#discussion_r329317918
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java
 ##
 @@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.FileVisitOption;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ *  Abstract class for the JobGraphRetriever, which wants to get classpath 
user's code depends on.
+ */
+
+public abstract class AbstractUserClassPathJobGraphRetriever implements 
JobGraphRetriever {
+
+   private final Logger log = LoggerFactory.getLogger(getClass());
+
+   public static final String DEFAULT_JOB_DIR = "job";
+
+   /** The directory contains all the jars, which user code depends on. */
+   @Nullable
+   private final String jobDir;
+
+   private List userClassPaths;
+
+   public AbstractUserClassPathJobGraphRetriever(String jobDir) {
+   this.jobDir = jobDir;
+   }
+
+   public List getUserClassPaths() throws IOException {
+   if (userClassPaths == null) {
+   userClassPaths = scanJarsInJobClassDir(jobDir);
+   }
+   return userClassPaths;
+   }
+
+   private List scanJarsInJobClassDir(String dir) throws IOException {
+
+   if (dir == null) {
+   return Collections.emptyList();
+   }
+
+   final File dirFile = new File(new Path(dir).toString());
+   final List jarURLs = new LinkedList<>();
+
+   if (!dirFile.exists()) {
+   log.warn("the job dir " + dirFile + " dose not 
exists.");
+   return Collections.emptyList();
+   }
+   if (!dirFile.isDirectory()) {
+   log.warn("the job dir " + dirFile + " is not a 
directory.");
+   return Collections.emptyList();
+   }
+
+   Files.walkFileTree(dirFile.toPath(),
 
 Review comment:
   I'm not sure whether there can be cycles in the file tree in all our cases.
   If there is a cycle, it will cause `FileSystemLoopException` and the 
`SimpleFileVisitor` will throw it directly and the job may fail.
   If we want to deal with it, we need to override 
`FileVisitResult#visitFileFailed(..)`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] Using FlinkUserCodeClassLoaders to load the user class in the perjob mode

2019-09-28 Thread GitBox
zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] 
Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
URL: https://github.com/apache/flink/pull/9715#discussion_r329313956
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetrieverTest.java
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.entrypoint.component;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link AbstractUserClassPathJobGraphRetriever}.
+ */
+public class AbstractUserClassPathJobGraphRetrieverTest {
 
 Review comment:
   The test class should `extends TestLogger`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services