This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit beaeead0a582fa646b6f26bcafcc106d4e07a293
Author: godfreyhe <[email protected]>
AuthorDate: Thu Nov 26 11:39:46 2020 +0800

    [FLINK-18545][configuration] Allow users to specify job name by 
configuration in ExecutionEnvironment
    
    (cherry picked from commit eaad1678f2de23addcee7592e982f61d690777a7)
---
 .../flink/client/ExecutionEnvironmentTest.java     | 35 ++++++++++++++++++++++
 .../flink/api/java/ExecutionEnvironment.java       | 19 +++++++-----
 2 files changed, 46 insertions(+), 8 deletions(-)

diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/ExecutionEnvironmentTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/ExecutionEnvironmentTest.java
index f512fe8..5f22fbd 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/ExecutionEnvironmentTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/ExecutionEnvironmentTest.java
@@ -19,10 +19,13 @@
 
 package org.apache.flink.client;
 
+import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -30,6 +33,7 @@ import org.junit.Test;
 import java.io.Serializable;
 
 import static junit.framework.TestCase.fail;
+import static org.junit.Assert.assertTrue;
 
 
 /**
@@ -63,4 +67,35 @@ public class ExecutionEnvironmentTest extends TestLogger 
implements Serializable
                        fail("Consecutive #getExecutionPlan calls caused an 
exception.");
                }
        }
+
+       @Test
+       public void testDefaultJobName() {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               testJobName("Flink Java Job at", env);
+       }
+
+       @Test
+       public void testUserDefinedJobName() {
+               String jobName = "MyTestJob";
+               Configuration config = new Configuration();
+               config.set(PipelineOptions.NAME, jobName);
+               ExecutionEnvironment env = 
ExecutionEnvironment.createLocalEnvironment(config);
+               testJobName(jobName, env);
+       }
+
+       @Test
+       public void testUserDefinedJobNameWithConfigure() {
+               String jobName = "MyTestJob";
+               Configuration config = new Configuration();
+               config.set(PipelineOptions.NAME, jobName);
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               env.configure(config, this.getClass().getClassLoader());
+               testJobName(jobName, env);
+       }
+
+       private void testJobName(String prefixOfExpectedJobName, 
ExecutionEnvironment env) {
+               env.fromElements(1, 2, 3).writeAsText("/dev/null");
+               Plan plan = env.createProgramPlan();
+               
assertTrue(plan.getJobName().startsWith(prefixOfExpectedJobName));
+       }
 }
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 52a0d85..c3f3268 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -418,6 +418,8 @@ public class ExecutionEnvironment {
                                this.cacheFile.clear();
                                
this.cacheFile.addAll(DistributedCache.parseCachedFilesFromString(f));
                        });
+               configuration.getOptional(PipelineOptions.NAME)
+                       .ifPresent(jobName -> 
this.getConfiguration().set(PipelineOptions.NAME, jobName));
                config.configure(configuration, classLoader);
        }
 
@@ -870,7 +872,7 @@ public class ExecutionEnvironment {
         * @throws Exception Thrown, if the program executions fails.
         */
        public JobExecutionResult execute() throws Exception {
-               return execute(getDefaultName());
+               return execute(getJobName());
        }
 
        /**
@@ -945,7 +947,7 @@ public class ExecutionEnvironment {
         */
        @PublicEvolving
        public final JobClient executeAsync() throws Exception {
-               return executeAsync(getDefaultName());
+               return executeAsync(getJobName());
        }
 
        /**
@@ -998,7 +1000,7 @@ public class ExecutionEnvironment {
         * @throws Exception Thrown, if the compiler could not be instantiated.
         */
        public String getExecutionPlan() throws Exception {
-               Plan p = createProgramPlan(getDefaultName(), false);
+               Plan p = createProgramPlan(getJobName(), false);
                return ExecutionPlanUtil.getExecutionPlanAsJSON(p);
        }
 
@@ -1051,7 +1053,7 @@ public class ExecutionEnvironment {
         */
        @Internal
        public Plan createProgramPlan() {
-               return createProgramPlan(getDefaultName());
+               return createProgramPlan(getJobName());
        }
 
        /**
@@ -1122,12 +1124,13 @@ public class ExecutionEnvironment {
        }
 
        /**
-        * Gets a default job name, based on the timestamp when this method is 
invoked.
+        * Gets the job name. If user defined job name is not found in the 
configuration,
+        * the default name based on the timestamp when this method is invoked 
will return.
         *
-        * @return A default job name.
+        * @return A job name.
         */
-       private static String getDefaultName() {
-               return "Flink Java Job at " + Calendar.getInstance().getTime();
+       private String getJobName() {
+               return configuration.getString(PipelineOptions.NAME, "Flink 
Java Job at " + Calendar.getInstance().getTime());
        }
 
        // 
--------------------------------------------------------------------------------------------

Reply via email to