This is an automated email from the ASF dual-hosted git repository. godfrey pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5b73483bf091894c668378a58df9cfd6a584796e Author: godfreyhe <[email protected]> AuthorDate: Tue Nov 17 23:04:37 2020 +0800 [FLINK-18545][python] Specify job name by `pipeline.name` for flink-python (cherry picked from commit 56edecd09104b7f876bf2ca4f9e6c4a590d0253b) --- .../org/apache/flink/python/util/PythonConfigUtil.java | 4 +++- .../apache/flink/python/util/PythonConfigUtilTest.java | 17 +++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java index 18e5371..bf323ab 100644 --- a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java +++ b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java @@ -22,6 +22,7 @@ import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.core.memory.ManagedMemoryUseCase; import org.apache.flink.python.PythonConfig; import org.apache.flink.python.PythonOptions; @@ -153,7 +154,8 @@ public class PythonConfigUtil { } } - StreamGraph streamGraph = env.getStreamGraph(StreamExecutionEnvironment.DEFAULT_JOB_NAME, clearTransformations); + String jobName = getEnvironmentConfig(env).getString(PipelineOptions.NAME, StreamExecutionEnvironment.DEFAULT_JOB_NAME); + StreamGraph streamGraph = env.getStreamGraph(jobName, clearTransformations); Collection<StreamNode> streamNodes = streamGraph.getStreamNodes(); for (StreamNode streamNode : streamNodes) { alignStreamNode(streamNode, streamGraph); diff --git a/flink-python/src/test/java/org/apache/flink/python/util/PythonConfigUtilTest.java b/flink-python/src/test/java/org/apache/flink/python/util/PythonConfigUtilTest.java index a6c9863..4f1da09 100644 --- a/flink-python/src/test/java/org/apache/flink/python/util/PythonConfigUtilTest.java +++ b/flink-python/src/test/java/org/apache/flink/python/util/PythonConfigUtilTest.java @@ -18,12 +18,17 @@ package org.apache.flink.python.util; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.graph.StreamGraph; import org.junit.Test; import java.lang.reflect.InvocationTargetException; +import java.util.Collections; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; /** @@ -37,4 +42,16 @@ public class PythonConfigUtilTest { Configuration envConfig = PythonConfigUtil.getEnvConfigWithDependencies(executionEnvironment); assertNotNull(envConfig); } + + @Test + public void testJobName() throws IllegalAccessException, NoSuchMethodException, InvocationTargetException, NoSuchFieldException { + String jobName = "MyTestJob"; + Configuration config = new Configuration(); + config.set(PipelineOptions.NAME, jobName); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); + + env.fromCollection(Collections.singletonList("test")).addSink(new DiscardingSink<>()); + StreamGraph streamGraph = PythonConfigUtil.generateStreamGraphWithDependencies(env, true); + assertEquals(jobName, streamGraph.getJobName()); + } }
