This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit beaeead0a582fa646b6f26bcafcc106d4e07a293 Author: godfreyhe <[email protected]> AuthorDate: Thu Nov 26 11:39:46 2020 +0800 [FLINK-18545][configuration] Allow users to specify job name by configuration in ExecutionEnvironment (cherry picked from commit eaad1678f2de23addcee7592e982f61d690777a7) --- .../flink/client/ExecutionEnvironmentTest.java | 35 ++++++++++++++++++++++ .../flink/api/java/ExecutionEnvironment.java | 19 +++++++----- 2 files changed, 46 insertions(+), 8 deletions(-) diff --git a/flink-clients/src/test/java/org/apache/flink/client/ExecutionEnvironmentTest.java b/flink-clients/src/test/java/org/apache/flink/client/ExecutionEnvironmentTest.java index f512fe8..5f22fbd 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/ExecutionEnvironmentTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/ExecutionEnvironmentTest.java @@ -19,10 +19,13 @@ package org.apache.flink.client; +import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -30,6 +33,7 @@ import org.junit.Test; import java.io.Serializable; import static junit.framework.TestCase.fail; +import static org.junit.Assert.assertTrue; /** @@ -63,4 +67,35 @@ public class ExecutionEnvironmentTest extends TestLogger implements Serializable fail("Consecutive #getExecutionPlan calls caused an exception."); } } + + @Test + public void testDefaultJobName() { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + testJobName("Flink Java Job at", env); + } + + @Test + public void testUserDefinedJobName() { + String jobName = "MyTestJob"; + Configuration config = new Configuration(); + config.set(PipelineOptions.NAME, jobName); + ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(config); + testJobName(jobName, env); + } + + @Test + public void testUserDefinedJobNameWithConfigure() { + String jobName = "MyTestJob"; + Configuration config = new Configuration(); + config.set(PipelineOptions.NAME, jobName); + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.configure(config, this.getClass().getClassLoader()); + testJobName(jobName, env); + } + + private void testJobName(String prefixOfExpectedJobName, ExecutionEnvironment env) { + env.fromElements(1, 2, 3).writeAsText("/dev/null"); + Plan plan = env.createProgramPlan(); + assertTrue(plan.getJobName().startsWith(prefixOfExpectedJobName)); + } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 52a0d85..c3f3268 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -418,6 +418,8 @@ public class ExecutionEnvironment { this.cacheFile.clear(); this.cacheFile.addAll(DistributedCache.parseCachedFilesFromString(f)); }); + configuration.getOptional(PipelineOptions.NAME) + .ifPresent(jobName -> this.getConfiguration().set(PipelineOptions.NAME, jobName)); config.configure(configuration, classLoader); } @@ -870,7 +872,7 @@ public class ExecutionEnvironment { * @throws Exception Thrown, if the program executions fails. */ public JobExecutionResult execute() throws Exception { - return execute(getDefaultName()); + return execute(getJobName()); } /** @@ -945,7 +947,7 @@ public class ExecutionEnvironment { */ @PublicEvolving public final JobClient executeAsync() throws Exception { - return executeAsync(getDefaultName()); + return executeAsync(getJobName()); } /** @@ -998,7 +1000,7 @@ public class ExecutionEnvironment { * @throws Exception Thrown, if the compiler could not be instantiated. */ public String getExecutionPlan() throws Exception { - Plan p = createProgramPlan(getDefaultName(), false); + Plan p = createProgramPlan(getJobName(), false); return ExecutionPlanUtil.getExecutionPlanAsJSON(p); } @@ -1051,7 +1053,7 @@ public class ExecutionEnvironment { */ @Internal public Plan createProgramPlan() { - return createProgramPlan(getDefaultName()); + return createProgramPlan(getJobName()); } /** @@ -1122,12 +1124,13 @@ public class ExecutionEnvironment { } /** - * Gets a default job name, based on the timestamp when this method is invoked. + * Gets the job name. If user defined job name is not found in the configuration, + * the default name based on the timestamp when this method is invoked will return. * - * @return A default job name. + * @return A job name. */ - private static String getDefaultName() { - return "Flink Java Job at " + Calendar.getInstance().getTime(); + private String getJobName() { + return configuration.getString(PipelineOptions.NAME, "Flink Java Job at " + Calendar.getInstance().getTime()); } // --------------------------------------------------------------------------------------------
