This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a5c3a20 [Feature][API-FLINK] flink core add blink planner (#992)
a5c3a20 is described below
commit a5c3a208b447e9cabfc95dbc1a5b3220da7c65d7
Author: zhaown <[email protected]>
AuthorDate: Mon Jan 10 16:45:30 2022 +0800
[Feature][API-FLINK] flink core add blink planner (#992)
* [Feature][API-FLINK] flink core add blink planner
* [Feature][API-FLINK] flink core add blink planner
Co-authored-by: zhaown <[email protected]>
---
pom.xml | 6 ++++++
seatunnel-apis/seatunnel-api-flink/pom.xml | 4 ++++
.../java/org/apache/seatunnel/flink/FlinkEnvironment.java | 13 ++++++++++++-
.../java/org/apache/seatunnel/flink/util/ConfigKeyName.java | 2 +-
4 files changed, 23 insertions(+), 2 deletions(-)
diff --git a/pom.xml b/pom.xml
index c36284b..ec3c859 100644
--- a/pom.xml
+++ b/pom.xml
@@ -172,6 +172,12 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>${flink.scope}</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>${flink.scope}</scope>
diff --git a/seatunnel-apis/seatunnel-api-flink/pom.xml
b/seatunnel-apis/seatunnel-api-flink/pom.xml
index e676885..24833c3 100644
--- a/seatunnel-apis/seatunnel-api-flink/pom.xml
+++ b/seatunnel-apis/seatunnel-api-flink/pom.xml
@@ -47,6 +47,10 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
</dependency>
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
index 428b0b7..3c3c344 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.flink;
+import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.env.RuntimeEnv;
import org.apache.seatunnel.flink.util.ConfigKeyName;
@@ -104,7 +105,17 @@ public class FlinkEnvironment implements RuntimeEnv {
}
private void createStreamTableEnvironment() {
- tableEnvironment =
StreamTableEnvironment.create(getStreamExecutionEnvironment());
+ // use blink and streammode
+ EnvironmentSettings.Builder envBuilder =
EnvironmentSettings.newInstance()
+ .inStreamingMode();
+ if (this.config.hasPath(ConfigKeyName.PLANNER) &&
"blink".equals(this.config.getString(ConfigKeyName.PLANNER))) {
+ envBuilder.useBlinkPlanner();
+ } else {
+ envBuilder.useOldPlanner();
+ }
+ EnvironmentSettings environmentSettings = envBuilder.build();
+
+ tableEnvironment =
StreamTableEnvironment.create(getStreamExecutionEnvironment(),
environmentSettings);
TableConfig config = tableEnvironment.getConfig();
if (this.config.hasPath(ConfigKeyName.MAX_STATE_RETENTION_TIME) &&
this.config.hasPath(ConfigKeyName.MIN_STATE_RETENTION_TIME)) {
long max =
this.config.getLong(ConfigKeyName.MAX_STATE_RETENTION_TIME);
diff --git
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/ConfigKeyName.java
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/ConfigKeyName.java
index 883c378..4b7c559 100644
---
a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/ConfigKeyName.java
+++
b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/ConfigKeyName.java
@@ -39,5 +39,5 @@ public class ConfigKeyName {
public static final String MAX_STATE_RETENTION_TIME =
"execution.query.state.max-retention";
public static final String MIN_STATE_RETENTION_TIME =
"execution.query.state.min-retention";
public static final String STATE_BACKEND = "execution.state.backend";
-
+ public static final String PLANNER = "execution.planner";
}