This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2c8e9d42cea40fd01be5cbdfdb791a4ecb5cdd07 Author: godfreyhe <[email protected]> AuthorDate: Tue Nov 17 22:50:50 2020 +0800 [FLINK-18545][configuration] Introduce `pipeline.name` to allow users to specify job name by configuration (cherry picked from commit 78d6ef6f41b24977c531cfcb430cf184edc6f5ed) --- .../generated/pipeline_configuration.html | 6 ++++ .../flink/configuration/PipelineOptions.java | 10 +++++++ .../environment/StreamExecutionEnvironment.java | 15 +++++++--- .../api/StreamExecutionEnvironmentTest.java | 33 ++++++++++++++++++++++ 4 files changed, 60 insertions(+), 4 deletions(-) diff --git a/docs/_includes/generated/pipeline_configuration.html b/docs/_includes/generated/pipeline_configuration.html index f3fc216..a7f970c 100644 --- a/docs/_includes/generated/pipeline_configuration.html +++ b/docs/_includes/generated/pipeline_configuration.html @@ -87,6 +87,12 @@ <td>The program-wide maximum parallelism used for operators which haven't specified a maximum parallelism. The maximum parallelism specifies the upper limit for dynamic scaling and the number of key groups used for partitioned state.</td> </tr> <tr> + <td><h5>pipeline.name</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The job name used for printing and logging.</td> + </tr> + <tr> <td><h5>pipeline.object-reuse</h5></td> <td style="word-wrap: break-word;">false</td> <td>Boolean</td> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java index 3c405b3..a687cba 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptions.java @@ -36,6 +36,16 @@ import static org.apache.flink.configuration.description.TextElement.text; */ @PublicEvolving public class PipelineOptions { + + /** + * The job name used for printing and logging. + */ + public static final ConfigOption<String> NAME = + key("pipeline.name") + .stringType() + .noDefaultValue() + .withDescription("The job name used for printing and logging."); + /** * A list of jar files that contain the user-defined function (UDF) classes and all classes used from within the UDFs. */ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 9effe65..f458c27 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -839,6 +839,9 @@ public class StreamExecutionEnvironment { configuration.getOptional(ExecutionOptions.USE_BATCH_STATE_BACKEND).ifPresent( sortInputs -> this.getConfiguration().set(ExecutionOptions.USE_BATCH_STATE_BACKEND, sortInputs) ); + configuration.getOptional(PipelineOptions.NAME).ifPresent( + jobName -> this.getConfiguration().set(PipelineOptions.NAME, jobName) + ); config.configure(configuration, classLoader); checkpointCfg.configure(configuration); } @@ -1798,7 +1801,7 @@ public class StreamExecutionEnvironment { * @throws Exception which occurs during job execution. */ public JobExecutionResult execute() throws Exception { - return execute(DEFAULT_JOB_NAME); + return execute(getJobName()); } /** @@ -1891,7 +1894,7 @@ public class StreamExecutionEnvironment { */ @PublicEvolving public final JobClient executeAsync() throws Exception { - return executeAsync(DEFAULT_JOB_NAME); + return executeAsync(getJobName()); } /** @@ -1958,7 +1961,7 @@ public class StreamExecutionEnvironment { */ @Internal public StreamGraph getStreamGraph() { - return getStreamGraph(DEFAULT_JOB_NAME); + return getStreamGraph(getJobName()); } /** @@ -2018,7 +2021,7 @@ public class StreamExecutionEnvironment { * @return The execution plan of the program, as a JSON String. */ public String getExecutionPlan() { - return getStreamGraph(DEFAULT_JOB_NAME, false).getStreamingPlanAsJSON(); + return getStreamGraph(getJobName(), false).getStreamingPlanAsJSON(); } /** @@ -2350,4 +2353,8 @@ public class StreamExecutionEnvironment { } return (T) resolvedTypeInfo; } + + private String getJobName() { + return configuration.getString(PipelineOptions.NAME, DEFAULT_JOB_NAME); + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java index d0bc4b0..e8d5add 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java @@ -23,6 +23,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -270,6 +272,37 @@ public class StreamExecutionEnvironmentTest { } @Test + public void testDefaultJobName() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + testJobName(StreamExecutionEnvironment.DEFAULT_JOB_NAME, env); + } + + @Test + public void testUserDefinedJobName() { + String jobName = "MyTestJob"; + Configuration config = new Configuration(); + config.set(PipelineOptions.NAME, jobName); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); + testJobName(jobName, env); + } + + @Test + public void testUserDefinedJobNameWithConfigure() { + String jobName = "MyTestJob"; + Configuration config = new Configuration(); + config.set(PipelineOptions.NAME, jobName); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.configure(config, this.getClass().getClassLoader()); + testJobName(jobName, env); + } + + private void testJobName(String expectedJobName, StreamExecutionEnvironment env) { + env.fromElements(1, 2, 3).print(); + StreamGraph streamGraph = env.getStreamGraph(); + assertEquals(expectedJobName, streamGraph.getJobName()); + } + + @Test public void testAddSourceWithUserDefinedTypeInfo() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Row> source1 = env.addSource(new RowSourceFunction(), Types.ROW(Types.STRING));
