This is an automated email from the ASF dual-hosted git repository. uce pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 34aa3f5ac9e5efb4d10680dd885ba1bf2f81b7f6 Author: Ufuk Celebi <u...@apache.org> AuthorDate: Mon Feb 25 17:48:50 2019 +0100 [FLINK-11533] [container] Make jobClassName argument optional [pr-review] Address comments --- .../entrypoint/ClassPathJobGraphRetriever.java | 20 +++++++++++++++----- .../StandaloneJobClusterConfiguration.java | 22 +++++++++++----------- ...daloneJobClusterConfigurationParserFactory.java | 14 +++++++------- .../entrypoint/StandaloneJobClusterEntryPoint.java | 19 ++++++++++--------- .../entrypoint/ClassPathJobGraphRetrieverTest.java | 8 ++++---- ...neJobClusterConfigurationParserFactoryTest.java | 12 +++++++++--- 6 files changed, 56 insertions(+), 39 deletions(-) diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java index 5554168..092ef8b 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java @@ -29,7 +29,11 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.util.FlinkException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nonnull; +import javax.annotation.Nullable; import static java.util.Objects.requireNonNull; @@ -39,8 +43,10 @@ import static java.util.Objects.requireNonNull; */ class ClassPathJobGraphRetriever implements JobGraphRetriever { - @Nonnull - private final String jobClassName; + private static final Logger LOG = LoggerFactory.getLogger(ClassPathJobGraphRetriever.class); + private static final String JAVA_CLASS_PATH = "java.class.path"; + private static final String PATH_SEPARATOR = "path.separator"; + private static final String DEFAULT_PATH_SEPARATOR = ":"; @Nonnull private final JobID jobId; @@ -51,15 +57,18 @@ class ClassPathJobGraphRetriever implements JobGraphRetriever { @Nonnull private final String[] programArguments; + @Nullable + private final String jobClassName; + ClassPathJobGraphRetriever( - @Nonnull String jobClassName, @Nonnull JobID jobId, @Nonnull SavepointRestoreSettings savepointRestoreSettings, - @Nonnull String[] programArguments) { - this.jobClassName = requireNonNull(jobClassName, "jobClassName"); + @Nonnull String[] programArguments, + @Nullable String jobClassName) { this.jobId = requireNonNull(jobId, "jobId"); this.savepointRestoreSettings = requireNonNull(savepointRestoreSettings, "savepointRestoreSettings"); this.programArguments = requireNonNull(programArguments, "programArguments"); + this.jobClassName = jobClassName; } @Override @@ -89,4 +98,5 @@ class ClassPathJobGraphRetriever implements JobGraphRetriever { throw new FlinkException("Could not load the provided entrypoint class.", e); } } + } diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java index 8d28b89..875a7c5 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java @@ -35,32 +35,27 @@ import static java.util.Objects.requireNonNull; final class StandaloneJobClusterConfiguration extends EntrypointClusterConfiguration { @Nonnull - private final String jobClassName; - - @Nonnull private final SavepointRestoreSettings savepointRestoreSettings; @Nonnull private final JobID jobId; + @Nullable + private final String jobClassName; + StandaloneJobClusterConfiguration( @Nonnull String configDir, @Nonnull Properties dynamicProperties, @Nonnull String[] args, @Nullable String hostname, int restPort, - @Nonnull String jobClassName, @Nonnull SavepointRestoreSettings savepointRestoreSettings, - @Nonnull JobID jobId) { + @Nonnull JobID jobId, + @Nullable String jobClassName) { super(configDir, dynamicProperties, args, hostname, restPort); - this.jobClassName = requireNonNull(jobClassName, "jobClassName"); this.savepointRestoreSettings = requireNonNull(savepointRestoreSettings, "savepointRestoreSettings"); this.jobId = requireNonNull(jobId, "jobId"); - } - - @Nonnull - String getJobClassName() { - return jobClassName; + this.jobClassName = jobClassName; } @Nonnull @@ -72,4 +67,9 @@ final class StandaloneJobClusterConfiguration extends EntrypointClusterConfigura JobID getJobId() { return jobId; } + + @Nullable + String getJobClassName() { + return jobClassName; + } } diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java index a404677..16fa63d 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java @@ -47,7 +47,7 @@ public class StandaloneJobClusterConfigurationParserFactory implements ParserRes private static final Option JOB_CLASS_NAME_OPTION = Option.builder("j") .longOpt("job-classname") - .required(true) + .required(false) .hasArg(true) .argName("job class name") .desc("Class name of the job to run.") @@ -81,9 +81,9 @@ public class StandaloneJobClusterConfigurationParserFactory implements ParserRes final Properties dynamicProperties = commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt()); final int restPort = getRestPort(commandLine); final String hostname = commandLine.getOptionValue(HOST_OPTION.getOpt()); - final String jobClassName = commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt()); final SavepointRestoreSettings savepointRestoreSettings = CliFrontendParser.createSavepointRestoreSettings(commandLine); final JobID jobId = getJobId(commandLine); + final String jobClassName = commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt()); return new StandaloneJobClusterConfiguration( configDir, @@ -91,9 +91,9 @@ public class StandaloneJobClusterConfigurationParserFactory implements ParserRes commandLine.getArgs(), hostname, restPort, - jobClassName, savepointRestoreSettings, - jobId); + jobId, + jobClassName); } private int getRestPort(CommandLine commandLine) throws FlinkParseException { @@ -101,7 +101,7 @@ public class StandaloneJobClusterConfigurationParserFactory implements ParserRes try { return Integer.parseInt(restPortString); } catch (NumberFormatException e) { - throw flinkParseException(REST_PORT_OPTION, e); + throw createFlinkParseException(REST_PORT_OPTION, e); } } @@ -113,11 +113,11 @@ public class StandaloneJobClusterConfigurationParserFactory implements ParserRes try { return JobID.fromHexString(jobId); } catch (IllegalArgumentException e) { - throw flinkParseException(JOB_ID_OPTION, e); + throw createFlinkParseException(JOB_ID_OPTION, e); } } - private static FlinkParseException flinkParseException(Option option, Exception cause) { + private static FlinkParseException createFlinkParseException(Option option, Exception cause) { return new FlinkParseException(String.format("Failed to parse '--%s' option", option.getLongOpt()), cause); } } diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java index 7fbcc8b..651a874 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.SignalHandler; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import static java.util.Objects.requireNonNull; @@ -42,9 +43,6 @@ import static java.util.Objects.requireNonNull; public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint { @Nonnull - private final String jobClassName; - - @Nonnull private final JobID jobId; @Nonnull @@ -53,24 +51,27 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint { @Nonnull private final String[] programArguments; + @Nullable + private final String jobClassName; + private StandaloneJobClusterEntryPoint( Configuration configuration, - @Nonnull String jobClassName, @Nonnull JobID jobId, @Nonnull SavepointRestoreSettings savepointRestoreSettings, - @Nonnull String[] programArguments) { + @Nonnull String[] programArguments, + @Nullable String jobClassName) { super(configuration); - this.jobClassName = requireNonNull(jobClassName, "jobClassName"); this.jobId = requireNonNull(jobId, "jobId"); this.savepointRestoreSettings = requireNonNull(savepointRestoreSettings, "savepointRestoreSettings"); this.programArguments = requireNonNull(programArguments, "programArguments"); + this.jobClassName = jobClassName; } @Override protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) { return new JobDispatcherResourceManagerComponentFactory( StandaloneResourceManagerFactory.INSTANCE, - new ClassPathJobGraphRetriever(jobClassName, jobId, savepointRestoreSettings, programArguments)); + new ClassPathJobGraphRetriever(jobId, savepointRestoreSettings, programArguments, jobClassName)); } public static void main(String[] args) { @@ -96,10 +97,10 @@ public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint { StandaloneJobClusterEntryPoint entrypoint = new StandaloneJobClusterEntryPoint( configuration, - clusterConfiguration.getJobClassName(), clusterConfiguration.getJobId(), clusterConfiguration.getSavepointRestoreSettings(), - clusterConfiguration.getArgs()); + clusterConfiguration.getArgs(), + clusterConfiguration.getJobClassName()); ClusterEntrypoint.runClusterEntrypoint(entrypoint); } diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java index cd038b3..d930c57 100644 --- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java @@ -48,10 +48,10 @@ public class ClassPathJobGraphRetrieverTest extends TestLogger { final JobID jobId = new JobID(); final ClassPathJobGraphRetriever classPathJobGraphRetriever = new ClassPathJobGraphRetriever( - TestJob.class.getCanonicalName(), jobId, SavepointRestoreSettings.none(), - PROGRAM_ARGUMENTS); + PROGRAM_ARGUMENTS, + TestJob.class.getCanonicalName()); final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(configuration); @@ -67,10 +67,10 @@ public class ClassPathJobGraphRetrieverTest extends TestLogger { final JobID jobId = new JobID(); final ClassPathJobGraphRetriever classPathJobGraphRetriever = new ClassPathJobGraphRetriever( - TestJob.class.getCanonicalName(), jobId, savepointRestoreSettings, - PROGRAM_ARGUMENTS); + PROGRAM_ARGUMENTS, + TestJob.class.getCanonicalName()); final JobGraph jobGraph = classPathJobGraphRetriever.retrieveJobGraph(configuration); diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java index b29d382..b856dc9 100644 --- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java @@ -36,6 +36,8 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.core.IsNull.nullValue; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -77,14 +79,18 @@ public class StandaloneJobClusterConfigurationParserFactoryTest extends TestLogg @Test public void testOnlyRequiredArguments() throws FlinkParseException { final String configDir = "/foo/bar"; - final String jobClassName = "foobar"; - final String[] args = {"--configDir", configDir, "--job-classname", jobClassName}; + final String[] args = {"--configDir", configDir}; final StandaloneJobClusterConfiguration clusterConfiguration = commandLineParser.parse(args); assertThat(clusterConfiguration.getConfigDir(), is(equalTo(configDir))); - assertThat(clusterConfiguration.getJobClassName(), is(equalTo(jobClassName))); + assertThat(clusterConfiguration.getDynamicProperties(), is(equalTo(new Properties()))); + assertThat(clusterConfiguration.getArgs(), is(new String[0])); assertThat(clusterConfiguration.getRestPort(), is(equalTo(-1))); + assertThat(clusterConfiguration.getHostname(), is(nullValue())); + assertThat(clusterConfiguration.getSavepointRestoreSettings(), is(equalTo(SavepointRestoreSettings.none()))); + assertThat(clusterConfiguration.getJobId(), is(not(nullValue()))); + assertThat(clusterConfiguration.getJobClassName(), is(nullValue())); } @Test(expected = FlinkParseException.class)