This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit efa6cabc0f7bc13e424c69daf21e493563c81c83
Author: Timo Walther <twal...@apache.org>
AuthorDate: Mon Jul 29 11:42:20 2019 +0200

    [FLINK-13273][table-planner-blink] Allow retrieving StreamGraph from Blink 
executor
---
 .../flink/table/planner/delegation/BatchExecutor.java      |  6 ++----
 .../flink/table/planner/delegation/ExecutorBase.java       | 14 +++++++++++---
 .../flink/table/planner/delegation/StreamExecutor.java     |  5 ++---
 3 files changed, 15 insertions(+), 10 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java
index 953cd1e..04fcf0d 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java
@@ -51,7 +51,7 @@ public class BatchExecutor extends ExecutorBase {
        @Override
        public JobExecutionResult execute(String jobName) throws Exception {
                StreamExecutionEnvironment execEnv = getExecutionEnvironment();
-               StreamGraph streamGraph = generateStreamGraph(transformations, 
jobName);
+               StreamGraph streamGraph = generateStreamGraph(jobName);
                return execEnv.execute(streamGraph);
        }
 
@@ -69,9 +69,7 @@ public class BatchExecutor extends ExecutorBase {
                }
        }
 
-       /**
-        * Translates transformationList to streamGraph.
-        */
+       @Override
        public StreamGraph generateStreamGraph(List<Transformation<?>> 
transformations, String jobName) {
                StreamExecutionEnvironment execEnv = getExecutionEnvironment();
                setBatchProperties(execEnv);
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ExecutorBase.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ExecutorBase.java
index 701a6cc..10eeafd 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ExecutorBase.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ExecutorBase.java
@@ -58,9 +58,17 @@ public abstract class ExecutorBase implements Executor {
                return executionEnvironment;
        }
 
-       public abstract StreamGraph generateStreamGraph(
-                       List<Transformation<?>> transformations,
-                       String jobName) throws Exception;
+       /**
+        * Translates the applied transformations to a stream graph.
+        */
+       public StreamGraph generateStreamGraph(String jobName) {
+               return generateStreamGraph(transformations, jobName);
+       }
+
+       /**
+        * Translates the given transformations to a stream graph.
+        */
+       public abstract StreamGraph generateStreamGraph(List<Transformation<?>> 
transformations, String jobName);
 
        protected String getNonEmptyJobName(String jobName) {
                if (StringUtils.isNullOrWhitespaceOnly(jobName)) {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/StreamExecutor.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/StreamExecutor.java
index 4af2f8e..8d1e904 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/StreamExecutor.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/StreamExecutor.java
@@ -46,9 +46,8 @@ public class StreamExecutor extends ExecutorBase {
                return execEnv.execute(generateStreamGraph(transformations, 
jobName));
        }
 
-       public StreamGraph generateStreamGraph(
-                       List<Transformation<?>> transformations,
-                       String jobName) throws Exception {
+       @Override
+       public StreamGraph generateStreamGraph(List<Transformation<?>> 
transformations, String jobName) {
                transformations.forEach(getExecutionEnvironment()::addOperator);
                return 
getExecutionEnvironment().getStreamGraph(getNonEmptyJobName(jobName));
        }

Reply via email to