This is an automated email from the ASF dual-hosted git repository.

uce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 34aa3f5ac9e5efb4d10680dd885ba1bf2f81b7f6
Author: Ufuk Celebi <u...@apache.org>
AuthorDate: Mon Feb 25 17:48:50 2019 +0100

    [FLINK-11533] [container] Make jobClassName argument optional
    
    [pr-review] Address comments
---
 .../entrypoint/ClassPathJobGraphRetriever.java     | 20 +++++++++++++++-----
 .../StandaloneJobClusterConfiguration.java         | 22 +++++++++++-----------
 ...daloneJobClusterConfigurationParserFactory.java | 14 +++++++-------
 .../entrypoint/StandaloneJobClusterEntryPoint.java | 19 ++++++++++---------
 .../entrypoint/ClassPathJobGraphRetrieverTest.java |  8 ++++----
 ...neJobClusterConfigurationParserFactoryTest.java | 12 +++++++++---
 6 files changed, 56 insertions(+), 39 deletions(-)

diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
index 5554168..092ef8b 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
@@ -29,7 +29,11 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.util.FlinkException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import static java.util.Objects.requireNonNull;
 
@@ -39,8 +43,10 @@ import static java.util.Objects.requireNonNull;
  */
 class ClassPathJobGraphRetriever implements JobGraphRetriever {
 
-       @Nonnull
-       private final String jobClassName;
+       private static final Logger LOG = 
LoggerFactory.getLogger(ClassPathJobGraphRetriever.class);
+       private static final String JAVA_CLASS_PATH = "java.class.path";
+       private static final String PATH_SEPARATOR = "path.separator";
+       private static final String DEFAULT_PATH_SEPARATOR = ":";
 
        @Nonnull
        private final JobID jobId;
@@ -51,15 +57,18 @@ class ClassPathJobGraphRetriever implements 
JobGraphRetriever {
        @Nonnull
        private final String[] programArguments;
 
+       @Nullable
+       private final String jobClassName;
+
        ClassPathJobGraphRetriever(
-                       @Nonnull String jobClassName,
                        @Nonnull JobID jobId,
                        @Nonnull SavepointRestoreSettings 
savepointRestoreSettings,
-                       @Nonnull String[] programArguments) {
-               this.jobClassName = requireNonNull(jobClassName, 
"jobClassName");
+                       @Nonnull String[] programArguments,
+                       @Nullable String jobClassName) {
                this.jobId = requireNonNull(jobId, "jobId");
                this.savepointRestoreSettings = 
requireNonNull(savepointRestoreSettings, "savepointRestoreSettings");
                this.programArguments = requireNonNull(programArguments, 
"programArguments");
+               this.jobClassName = jobClassName;
        }
 
        @Override
@@ -89,4 +98,5 @@ class ClassPathJobGraphRetriever implements JobGraphRetriever 
{
                        throw new FlinkException("Could not load the provided 
entrypoint class.", e);
                }
        }
+
 }
diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
index 8d28b89..875a7c5 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfiguration.java
@@ -35,32 +35,27 @@ import static java.util.Objects.requireNonNull;
 final class StandaloneJobClusterConfiguration extends 
EntrypointClusterConfiguration {
 
        @Nonnull
-       private final String jobClassName;
-
-       @Nonnull
        private final SavepointRestoreSettings savepointRestoreSettings;
 
        @Nonnull
        private final JobID jobId;
 
+       @Nullable
+       private final String jobClassName;
+
        StandaloneJobClusterConfiguration(
                        @Nonnull String configDir,
                        @Nonnull Properties dynamicProperties,
                        @Nonnull String[] args,
                        @Nullable String hostname,
                        int restPort,
-                       @Nonnull String jobClassName,
                        @Nonnull SavepointRestoreSettings 
savepointRestoreSettings,
-                       @Nonnull JobID jobId) {
+                       @Nonnull JobID jobId,
+                       @Nullable String jobClassName) {
                super(configDir, dynamicProperties, args, hostname, restPort);
-               this.jobClassName = requireNonNull(jobClassName, 
"jobClassName");
                this.savepointRestoreSettings = 
requireNonNull(savepointRestoreSettings, "savepointRestoreSettings");
                this.jobId = requireNonNull(jobId, "jobId");
-       }
-
-       @Nonnull
-       String getJobClassName() {
-               return jobClassName;
+               this.jobClassName = jobClassName;
        }
 
        @Nonnull
@@ -72,4 +67,9 @@ final class StandaloneJobClusterConfiguration extends 
EntrypointClusterConfigura
        JobID getJobId() {
                return jobId;
        }
+
+       @Nullable
+       String getJobClassName() {
+               return jobClassName;
+       }
 }
diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
index a404677..16fa63d 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
@@ -47,7 +47,7 @@ public class StandaloneJobClusterConfigurationParserFactory 
implements ParserRes
 
        private static final Option JOB_CLASS_NAME_OPTION = Option.builder("j")
                .longOpt("job-classname")
-               .required(true)
+               .required(false)
                .hasArg(true)
                .argName("job class name")
                .desc("Class name of the job to run.")
@@ -81,9 +81,9 @@ public class StandaloneJobClusterConfigurationParserFactory 
implements ParserRes
                final Properties dynamicProperties = 
commandLine.getOptionProperties(DYNAMIC_PROPERTY_OPTION.getOpt());
                final int restPort = getRestPort(commandLine);
                final String hostname = 
commandLine.getOptionValue(HOST_OPTION.getOpt());
-               final String jobClassName = 
commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt());
                final SavepointRestoreSettings savepointRestoreSettings = 
CliFrontendParser.createSavepointRestoreSettings(commandLine);
                final JobID jobId = getJobId(commandLine);
+               final String jobClassName = 
commandLine.getOptionValue(JOB_CLASS_NAME_OPTION.getOpt());
 
                return new StandaloneJobClusterConfiguration(
                        configDir,
@@ -91,9 +91,9 @@ public class StandaloneJobClusterConfigurationParserFactory 
implements ParserRes
                        commandLine.getArgs(),
                        hostname,
                        restPort,
-                       jobClassName,
                        savepointRestoreSettings,
-                       jobId);
+                       jobId,
+                       jobClassName);
        }
 
        private int getRestPort(CommandLine commandLine) throws 
FlinkParseException {
@@ -101,7 +101,7 @@ public class StandaloneJobClusterConfigurationParserFactory 
implements ParserRes
                try {
                        return Integer.parseInt(restPortString);
                } catch (NumberFormatException e) {
-                       throw flinkParseException(REST_PORT_OPTION, e);
+                       throw createFlinkParseException(REST_PORT_OPTION, e);
                }
        }
 
@@ -113,11 +113,11 @@ public class 
StandaloneJobClusterConfigurationParserFactory implements ParserRes
                try {
                        return JobID.fromHexString(jobId);
                } catch (IllegalArgumentException e) {
-                       throw flinkParseException(JOB_ID_OPTION, e);
+                       throw createFlinkParseException(JOB_ID_OPTION, e);
                }
        }
 
-       private static FlinkParseException flinkParseException(Option option, 
Exception cause) {
+       private static FlinkParseException createFlinkParseException(Option 
option, Exception cause) {
                return new FlinkParseException(String.format("Failed to parse 
'--%s' option", option.getLongOpt()), cause);
        }
 }
diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
index 7fbcc8b..651a874 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterEntryPoint.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import static java.util.Objects.requireNonNull;
 
@@ -42,9 +43,6 @@ import static java.util.Objects.requireNonNull;
 public final class StandaloneJobClusterEntryPoint extends JobClusterEntrypoint 
{
 
        @Nonnull
-       private final String jobClassName;
-
-       @Nonnull
        private final JobID jobId;
 
        @Nonnull
@@ -53,24 +51,27 @@ public final class StandaloneJobClusterEntryPoint extends 
JobClusterEntrypoint {
        @Nonnull
        private final String[] programArguments;
 
+       @Nullable
+       private final String jobClassName;
+
        private StandaloneJobClusterEntryPoint(
                        Configuration configuration,
-                       @Nonnull String jobClassName,
                        @Nonnull JobID jobId,
                        @Nonnull SavepointRestoreSettings 
savepointRestoreSettings,
-                       @Nonnull String[] programArguments) {
+                       @Nonnull String[] programArguments,
+                       @Nullable String jobClassName) {
                super(configuration);
-               this.jobClassName = requireNonNull(jobClassName, 
"jobClassName");
                this.jobId = requireNonNull(jobId, "jobId");
                this.savepointRestoreSettings = 
requireNonNull(savepointRestoreSettings, "savepointRestoreSettings");
                this.programArguments = requireNonNull(programArguments, 
"programArguments");
+               this.jobClassName = jobClassName;
        }
 
        @Override
        protected DispatcherResourceManagerComponentFactory<?> 
createDispatcherResourceManagerComponentFactory(Configuration configuration) {
                return new JobDispatcherResourceManagerComponentFactory(
                        StandaloneResourceManagerFactory.INSTANCE,
-                       new ClassPathJobGraphRetriever(jobClassName, jobId, 
savepointRestoreSettings, programArguments));
+                       new ClassPathJobGraphRetriever(jobId, 
savepointRestoreSettings, programArguments, jobClassName));
        }
 
        public static void main(String[] args) {
@@ -96,10 +97,10 @@ public final class StandaloneJobClusterEntryPoint extends 
JobClusterEntrypoint {
 
                StandaloneJobClusterEntryPoint entrypoint = new 
StandaloneJobClusterEntryPoint(
                        configuration,
-                       clusterConfiguration.getJobClassName(),
                        clusterConfiguration.getJobId(),
                        clusterConfiguration.getSavepointRestoreSettings(),
-                       clusterConfiguration.getArgs());
+                       clusterConfiguration.getArgs(),
+                       clusterConfiguration.getJobClassName());
 
                ClusterEntrypoint.runClusterEntrypoint(entrypoint);
        }
diff --git 
a/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
index cd038b3..d930c57 100644
--- 
a/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
+++ 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetrieverTest.java
@@ -48,10 +48,10 @@ public class ClassPathJobGraphRetrieverTest extends 
TestLogger {
                final JobID jobId = new JobID();
 
                final ClassPathJobGraphRetriever classPathJobGraphRetriever = 
new ClassPathJobGraphRetriever(
-                       TestJob.class.getCanonicalName(),
                        jobId,
                        SavepointRestoreSettings.none(),
-                       PROGRAM_ARGUMENTS);
+                       PROGRAM_ARGUMENTS,
+                       TestJob.class.getCanonicalName());
 
                final JobGraph jobGraph = 
classPathJobGraphRetriever.retrieveJobGraph(configuration);
 
@@ -67,10 +67,10 @@ public class ClassPathJobGraphRetrieverTest extends 
TestLogger {
                final JobID jobId = new JobID();
 
                final ClassPathJobGraphRetriever classPathJobGraphRetriever = 
new ClassPathJobGraphRetriever(
-                       TestJob.class.getCanonicalName(),
                        jobId,
                        savepointRestoreSettings,
-                       PROGRAM_ARGUMENTS);
+                       PROGRAM_ARGUMENTS,
+                       TestJob.class.getCanonicalName());
 
                final JobGraph jobGraph = 
classPathJobGraphRetriever.retrieveJobGraph(configuration);
 
diff --git 
a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java
 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java
index b29d382..b856dc9 100644
--- 
a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java
+++ 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java
@@ -36,6 +36,8 @@ import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.core.IsNull.nullValue;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -77,14 +79,18 @@ public class 
StandaloneJobClusterConfigurationParserFactoryTest extends TestLogg
        @Test
        public void testOnlyRequiredArguments() throws FlinkParseException {
                final String configDir = "/foo/bar";
-               final String jobClassName = "foobar";
-               final String[] args = {"--configDir", configDir, 
"--job-classname", jobClassName};
+               final String[] args = {"--configDir", configDir};
 
                final StandaloneJobClusterConfiguration clusterConfiguration = 
commandLineParser.parse(args);
 
                assertThat(clusterConfiguration.getConfigDir(), 
is(equalTo(configDir)));
-               assertThat(clusterConfiguration.getJobClassName(), 
is(equalTo(jobClassName)));
+               assertThat(clusterConfiguration.getDynamicProperties(), 
is(equalTo(new Properties())));
+               assertThat(clusterConfiguration.getArgs(), is(new String[0]));
                assertThat(clusterConfiguration.getRestPort(), is(equalTo(-1)));
+               assertThat(clusterConfiguration.getHostname(), is(nullValue()));
+               assertThat(clusterConfiguration.getSavepointRestoreSettings(), 
is(equalTo(SavepointRestoreSettings.none())));
+               assertThat(clusterConfiguration.getJobId(), 
is(not(nullValue())));
+               assertThat(clusterConfiguration.getJobClassName(),  
is(nullValue()));
        }
 
        @Test(expected = FlinkParseException.class)

Reply via email to