This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5b73483bf091894c668378a58df9cfd6a584796e
Author: godfreyhe <[email protected]>
AuthorDate: Tue Nov 17 23:04:37 2020 +0800

    [FLINK-18545][python] Specify job name by `pipeline.name` for flink-python
    
    (cherry picked from commit 56edecd09104b7f876bf2ca4f9e6c4a590d0253b)
---
 .../org/apache/flink/python/util/PythonConfigUtil.java  |  4 +++-
 .../apache/flink/python/util/PythonConfigUtilTest.java  | 17 +++++++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git 
a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java 
b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
index 18e5371..bf323ab 100644
--- 
a/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
+++ 
b/flink-python/src/main/java/org/apache/flink/python/util/PythonConfigUtil.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
 import org.apache.flink.python.PythonConfig;
 import org.apache.flink.python.PythonOptions;
@@ -153,7 +154,8 @@ public class PythonConfigUtil {
                        }
                }
 
-               StreamGraph streamGraph = 
env.getStreamGraph(StreamExecutionEnvironment.DEFAULT_JOB_NAME, 
clearTransformations);
+               String jobName = 
getEnvironmentConfig(env).getString(PipelineOptions.NAME, 
StreamExecutionEnvironment.DEFAULT_JOB_NAME);
+               StreamGraph streamGraph = env.getStreamGraph(jobName, 
clearTransformations);
                Collection<StreamNode> streamNodes = 
streamGraph.getStreamNodes();
                for (StreamNode streamNode : streamNodes) {
                        alignStreamNode(streamNode, streamGraph);
diff --git 
a/flink-python/src/test/java/org/apache/flink/python/util/PythonConfigUtilTest.java
 
b/flink-python/src/test/java/org/apache/flink/python/util/PythonConfigUtilTest.java
index a6c9863..4f1da09 100644
--- 
a/flink-python/src/test/java/org/apache/flink/python/util/PythonConfigUtilTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/python/util/PythonConfigUtilTest.java
@@ -18,12 +18,17 @@
 package org.apache.flink.python.util;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.graph.StreamGraph;
 
 import org.junit.Test;
 
 import java.lang.reflect.InvocationTargetException;
+import java.util.Collections;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
 /**
@@ -37,4 +42,16 @@ public class PythonConfigUtilTest {
                Configuration envConfig = 
PythonConfigUtil.getEnvConfigWithDependencies(executionEnvironment);
                assertNotNull(envConfig);
        }
+
+       @Test
+       public void testJobName() throws IllegalAccessException, 
NoSuchMethodException, InvocationTargetException, NoSuchFieldException {
+               String jobName = "MyTestJob";
+               Configuration config = new Configuration();
+               config.set(PipelineOptions.NAME, jobName);
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
+
+               
env.fromCollection(Collections.singletonList("test")).addSink(new 
DiscardingSink<>());
+               StreamGraph streamGraph = 
PythonConfigUtil.generateStreamGraphWithDependencies(env, true);
+               assertEquals(jobName, streamGraph.getJobName());
+       }
 }

Reply via email to