This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a5c3a20  [Feature][API-FLINK] flink core add blink planner (#992)
a5c3a20 is described below

commit a5c3a208b447e9cabfc95dbc1a5b3220da7c65d7
Author: zhaown <[email protected]>
AuthorDate: Mon Jan 10 16:45:30 2022 +0800

    [Feature][API-FLINK] flink core add blink planner (#992)
    
    * [Feature][API-FLINK] flink core add blink planner
    
    * [Feature][API-FLINK] flink core add blink planner
    
    Co-authored-by: zhaown <[email protected]>
---
 pom.xml                                                     |  6 ++++++
 seatunnel-apis/seatunnel-api-flink/pom.xml                  |  4 ++++
 .../java/org/apache/seatunnel/flink/FlinkEnvironment.java   | 13 ++++++++++++-
 .../java/org/apache/seatunnel/flink/util/ConfigKeyName.java |  2 +-
 4 files changed, 23 insertions(+), 2 deletions(-)

diff --git a/pom.xml b/pom.xml
index c36284b..ec3c859 100644
--- a/pom.xml
+++ b/pom.xml
@@ -172,6 +172,12 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.flink</groupId>
+                
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+                <version>${flink.version}</version>
+                <scope>${flink.scope}</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.flink</groupId>
                 
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
                 <version>${flink.version}</version>
                 <scope>${flink.scope}</scope>
diff --git a/seatunnel-apis/seatunnel-api-flink/pom.xml 
b/seatunnel-apis/seatunnel-api-flink/pom.xml
index e676885..24833c3 100644
--- a/seatunnel-apis/seatunnel-api-flink/pom.xml
+++ b/seatunnel-apis/seatunnel-api-flink/pom.xml
@@ -47,6 +47,10 @@
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
             
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
         </dependency>
 
diff --git 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
index 428b0b7..3c3c344 100644
--- 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
+++ 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.flink;
 
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.env.RuntimeEnv;
 import org.apache.seatunnel.flink.util.ConfigKeyName;
@@ -104,7 +105,17 @@ public class FlinkEnvironment implements RuntimeEnv {
     }
 
     private void createStreamTableEnvironment() {
-        tableEnvironment = 
StreamTableEnvironment.create(getStreamExecutionEnvironment());
+        // use blink and streammode
+        EnvironmentSettings.Builder envBuilder = 
EnvironmentSettings.newInstance()
+                .inStreamingMode();
+        if (this.config.hasPath(ConfigKeyName.PLANNER) && 
"blink".equals(this.config.getString(ConfigKeyName.PLANNER))) {
+            envBuilder.useBlinkPlanner();
+        } else {
+            envBuilder.useOldPlanner();
+        }
+        EnvironmentSettings environmentSettings = envBuilder.build();
+
+        tableEnvironment = 
StreamTableEnvironment.create(getStreamExecutionEnvironment(), 
environmentSettings);
         TableConfig config = tableEnvironment.getConfig();
         if (this.config.hasPath(ConfigKeyName.MAX_STATE_RETENTION_TIME) && 
this.config.hasPath(ConfigKeyName.MIN_STATE_RETENTION_TIME)) {
             long max = 
this.config.getLong(ConfigKeyName.MAX_STATE_RETENTION_TIME);
diff --git 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/ConfigKeyName.java
 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/ConfigKeyName.java
index 883c378..4b7c559 100644
--- 
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/ConfigKeyName.java
+++ 
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/ConfigKeyName.java
@@ -39,5 +39,5 @@ public class ConfigKeyName {
     public static final String MAX_STATE_RETENTION_TIME = 
"execution.query.state.max-retention";
     public static final String MIN_STATE_RETENTION_TIME = 
"execution.query.state.min-retention";
     public static final String STATE_BACKEND = "execution.state.backend";
-
+    public static final String PLANNER = "execution.planner";
 }

Reply via email to