[GitHub] [flink] zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] Using FlinkUserCodeClassLoaders to load the user class in the perjob mode URL: https://github.com/apache/flink/pull/9715#discussion_r329319129 ## File path: flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java ## @@ -110,12 +121,14 @@ public JobGraph retrieveJobGraph(Configuration configuration) throws FlinkExcept } } - private PackagedProgram createPackagedProgram() throws FlinkException { + private PackagedProgram createPackagedProgram() throws FlinkException, IOException { final String entryClass = getJobClassNameOrScanClassPath(); try { - final Class mainClass = getClass().getClassLoader().loadClass(entryClass); - return new PackagedProgram(mainClass, programArguments); - } catch (ClassNotFoundException | ProgramInvocationException e) { + final String userClassPaths = getUserClassPaths(). + stream().map(URL::toString).collect(Collectors.joining(":")); Review comment: According to the [code style guide](https://flink.apache.org/contributing/code-style-and-quality-formatting.html#breaking-the-lines-of-too-long-statements), each call should be in a new line when breaking long statements. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] Using FlinkUserCodeClassLoaders to load the user class in the perjob mode URL: https://github.com/apache/flink/pull/9715#discussion_r329321484 ## File path: flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java ## @@ -55,6 +66,62 @@ private static final String[] PROGRAM_ARGUMENTS = {"--arg", "suffix"}; + private static URL testJobForUserClassLoaderJarURL = null; + private static URL testJobDependedJarURL = null; + + private Path userDirHasEntryClass; + private Path userJarHasEntryClass; + private Path userEntryClassDependJarAtUserDirHasEntryClass; + private Path textAtUserDirHasEntryClass; + + private Path userDirHasNotEntryClass; + private Path userJarHasNotEntryClass; + private Path userEntryClassDependJarAtUserDirHasNotEntryClass; + private Path textAtUserDirHasNotEntryClass; + + Review comment: These duplicated empty lines can be removed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] Using FlinkUserCodeClassLoaders to load the user class in the perjob mode URL: https://github.com/apache/flink/pull/9715#discussion_r329320115 ## File path: flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java ## @@ -77,7 +81,9 @@ private StandaloneJobClusterEntryPoint( protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) { return new JobDispatcherResourceManagerComponentFactory( StandaloneResourceManagerFactory.INSTANCE, - new ClassPathJobGraphRetriever(jobId, savepointRestoreSettings, programArguments, jobClassName)); + new ClassPathJobGraphRetriever(jobId, savepointRestoreSettings, programArguments, jobClassName, + Paths.get(System.getenv(ENV_FLINK_HOME_DIR) == null ? DEFAULT_FLINK_HOME : System.getenv(ENV_FLINK_HOME_DIR), Review comment: Hard coding "/opt/flink" looks a bit tricky. Is it possible to we set "FLINK_HOME" in all cases? So that we can throw exception here for unexpected cases. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] Using FlinkUserCodeClassLoaders to load the user class in the perjob mode URL: https://github.com/apache/flink/pull/9715#discussion_r329318669 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint.component; + +import org.apache.flink.core.fs.Path; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.file.FileVisitOption; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.Collections; +import java.util.EnumSet; +import java.util.LinkedList; +import java.util.List; + +/** + * Abstract class for the JobGraphRetriever, which wants to get classpath user's code depends on. + */ + +public abstract class AbstractUserClassPathJobGraphRetriever implements JobGraphRetriever { + + private final Logger log = LoggerFactory.getLogger(getClass()); Review comment: Or maybe we can make it a private static logger for itself. And then we can make `scanJarsInJobClassDir` static. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] Using FlinkUserCodeClassLoaders to load the user class in the perjob mode URL: https://github.com/apache/flink/pull/9715#discussion_r329314051 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetrieverTest.java ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint.component; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import org.apache.commons.collections.CollectionUtils; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the {@link AbstractUserClassPathJobGraphRetriever}. + */ +public class AbstractUserClassPathJobGraphRetrieverTest { + + @Rule + public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** +* Test class. +*/ + public static class TestJobGraphRetrieverTest extends AbstractUserClassPathJobGraphRetriever { Review comment: I guess the name should be `TestJobGraphRetriever`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] Using FlinkUserCodeClassLoaders to load the user class in the perjob mode URL: https://github.com/apache/flink/pull/9715#discussion_r329317178 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint.component; + +import org.apache.flink.core.fs.Path; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.file.FileVisitOption; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.Collections; +import java.util.EnumSet; +import java.util.LinkedList; +import java.util.List; + +/** + * Abstract class for the JobGraphRetriever, which wants to get classpath user's code depends on. + */ + +public abstract class AbstractUserClassPathJobGraphRetriever implements JobGraphRetriever { + + private final Logger log = LoggerFactory.getLogger(getClass()); Review comment: I think the logger should be in protected scope. And `AbstractUserClassPathJobGraphRetriever` subclasses should use it rather than creating loggers by themselves. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] Using FlinkUserCodeClassLoaders to load the user class in the perjob mode URL: https://github.com/apache/flink/pull/9715#discussion_r329317918 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetriever.java ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint.component; + +import org.apache.flink.core.fs.Path; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.file.FileVisitOption; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.Collections; +import java.util.EnumSet; +import java.util.LinkedList; +import java.util.List; + +/** + * Abstract class for the JobGraphRetriever, which wants to get classpath user's code depends on. + */ + +public abstract class AbstractUserClassPathJobGraphRetriever implements JobGraphRetriever { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + public static final String DEFAULT_JOB_DIR = "job"; + + /** The directory contains all the jars, which user code depends on. */ + @Nullable + private final String jobDir; + + private List userClassPaths; + + public AbstractUserClassPathJobGraphRetriever(String jobDir) { + this.jobDir = jobDir; + } + + public List getUserClassPaths() throws IOException { + if (userClassPaths == null) { + userClassPaths = scanJarsInJobClassDir(jobDir); + } + return userClassPaths; + } + + private List scanJarsInJobClassDir(String dir) throws IOException { + + if (dir == null) { + return Collections.emptyList(); + } + + final File dirFile = new File(new Path(dir).toString()); + final List jarURLs = new LinkedList<>(); + + if (!dirFile.exists()) { + log.warn("the job dir " + dirFile + " dose not exists."); + return Collections.emptyList(); + } + if (!dirFile.isDirectory()) { + log.warn("the job dir " + dirFile + " is not a directory."); + return Collections.emptyList(); + } + + Files.walkFileTree(dirFile.toPath(), Review comment: I'm not sure whether there can be cycles in the file tree in all our cases. If there is a cycle, it will cause `FileSystemLoopException` and the `SimpleFileVisitor` will throw it directly and the job may fail. If we want to deal with it, we need to override `FileVisitResult#visitFileFailed(..)`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] Using FlinkUserCodeClassLoaders to load the user class in the perjob mode
zhuzhurk commented on a change in pull request #9715: [FLINK-13993][runtime] Using FlinkUserCodeClassLoaders to load the user class in the perjob mode URL: https://github.com/apache/flink/pull/9715#discussion_r329313956 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/entrypoint/component/AbstractUserClassPathJobGraphRetrieverTest.java ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.entrypoint.component; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.JobGraph; + +import org.apache.commons.collections.CollectionUtils; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the {@link AbstractUserClassPathJobGraphRetriever}. + */ +public class AbstractUserClassPathJobGraphRetrieverTest { Review comment: The test class should `extends TestLogger`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services