This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fea90644005d32af5a5b8ff79c2baafad687d3f7
Author: Timo Walther <twal...@apache.org>
AuthorDate: Mon Jul 29 14:30:10 2019 +0200

    [FLINK-13273][sql-client] Allow switching planners in SQL Client
    
    This closes #9266.
---
 docs/dev/table/sqlClient.md                        |  53 ++++++-----
 flink-end-to-end-tests/run-nightly-tests.sh        |   3 +-
 .../test-scripts/test_sql_client.sh                |  17 ++--
 .../flink-sql-client/conf/sql-client-defaults.yaml |  40 ++++----
 flink-table/flink-sql-client/pom.xml               |  12 +++
 .../client/config/entries/ExecutionEntry.java      |  90 +++++++++++++++---
 .../flink/table/client/gateway/SessionContext.java |   8 +-
 .../gateway/local/CollectBatchTableSink.java       |  19 ++--
 .../gateway/local/CollectStreamTableSink.java      |   8 +-
 .../client/gateway/local/ExecutionContext.java     | 104 ++++++++++++++++++---
 .../table/client/gateway/local/LocalExecutor.java  |   3 +-
 .../table/client/gateway/local/ResultStore.java    |   2 +-
 .../client/gateway/local/EnvironmentTest.java      |   3 +-
 .../client/gateway/local/ExecutionContextTest.java |   2 +
 .../client/gateway/local/LocalExecutorITCase.java  |  26 +++++-
 .../test/resources/test-sql-client-defaults.yaml   |  15 +--
 tools/travis/splits/split_misc.sh                  |   3 +-
 17 files changed, 306 insertions(+), 102 deletions(-)

diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index cd69cce..8ec569d 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -199,9 +199,24 @@ functions:
       - 7.6
       - false
 
+# Define available catalogs
+
+catalogs:
+   - name: catalog_1
+     type: hive
+     property-version: 1
+     hive-conf-dir: ...
+   - name: catalog_2
+     type: hive
+     property-version: 1
+     default-database: mydb2
+     hive-conf-dir: ...
+     hive-version: 1.2.1
+
 # Execution properties allow for changing the behavior of a table program.
 
 execution:
+  planner: old                      # optional: either 'old' (default) or 
'blink'
   type: streaming                   # required: execution mode either 'batch' 
or 'streaming'
   result-mode: table                # required: either 'table' or 'changelog'
   max-table-result-rows: 1000000    # optional: maximum number of maintained 
rows in
@@ -212,29 +227,16 @@ execution:
   max-parallelism: 16               # optional: Flink's maximum parallelism 
(128 by default)
   min-idle-state-retention: 0       # optional: table program's minimum idle 
state time
   max-idle-state-retention: 0       # optional: table program's maximum idle 
state time
+  current-catalog: catalog_1        # optional: name of the current catalog of 
the session ('default_catalog' by default)
+  current-database: mydb1           # optional: name of the current database 
of the current catalog
+                                    #   (default database of the current 
catalog by default)
   restart-strategy:                 # optional: restart strategy
     type: fallback                  #   "fallback" to global restart strategy 
by default
-  current-catalog: catalog_1        # optional: name of the current catalog of 
the session ("default_catalog" by default)
-  current-database: mydb1           # optional: name of the current database 
of the current catalog (default value is the default database name of the 
current catalog)
 
 # Deployment properties allow for describing the cluster to which table 
programs are submitted to.
 
 deployment:
   response-timeout: 5000
-
-# Catalogs
-
-catalogs:
-   - name: catalog_1
-     type: hive
-     property-version: 1
-     hive-conf-dir: ...
-   - name: catalog_2
-     type: hive
-     property-version: 1
-     default-database: mydb2        # optional: name of default database of 
this catalog
-     hive-conf-dir: ...             # optional: path of Hive conf directory. 
(Default value is created by HiveConf)
-     hive-version: 1.2.1            # optional: version of Hive (2.3.4 by 
default)
 {% endhighlight %}
 
 This configuration:
@@ -242,11 +244,10 @@ This configuration:
 - defines an environment with a table source `MyTableSource` that reads from a 
CSV file,
 - defines a view `MyCustomView` that declares a virtual table using a SQL 
query,
 - defines a user-defined function `myUDF` that can be instantiated using the 
class name and two constructor parameters,
+- connects to two Hive catalogs and uses `catalog_1` as the current catalog 
with `mydb1` as the current database of the catalog,
 - specifies a parallelism of 1 for queries executed in this streaming 
environment,
 - specifies an event-time characteristic, and
 - runs queries in the `table` result mode.
-- creates two `HiveCatalog` (type: hive) named with their own default 
databases and specified Hive conf directory. Hive version of the first 
`HiveCatalog` is `2.3.4` by default and that of the second one is specified as 
`1.2.1`.
-- use `catalog_1` as the current catalog of the environment upon start, and 
`mydb1` as the current database of the catalog.
 
 Depending on the use case, a configuration can be split into multiple files. 
Therefore, environment files can be created for general purposes (*defaults 
environment file* using `--defaults`) as well as on a per-session basis 
(*session environment file* using `--environment`). Every CLI session is 
initialized with the default properties followed by the session properties. For 
example, the defaults environment file could specify all table sources that 
should be available for querying in ev [...]
 
@@ -431,16 +432,11 @@ This process can be recursively performed until all the 
constructor parameters a
 Catalogs
 --------
 
-Catalogs can be defined as a set of yaml properties and are automatically 
registered to the environment upon starting SQL Client.
+Catalogs can be defined as a set of YAML properties and are automatically 
registered to the environment upon starting SQL Client.
 
-Users can specify in section `execution` that which catalog they want to use 
as the current catalog in SQL CLI, and which database of the catalog they want 
to use as the current database. 
+Users can specify which catalog they want to use as the current catalog in SQL 
CLI, and which database of the catalog they want to use as the current database.
 
 {% highlight yaml %}
-execution:
-   ...
-   current-catalog: catalog_1
-   current-database: mydb1
-
 catalogs:
    - name: catalog_1
      type: hive
@@ -452,9 +448,12 @@ catalogs:
      type: hive
      property-version: 1
      hive-conf-dir: <path of Hive conf directory>
-{% endhighlight %}
 
-Currently Flink supports two types of catalog - `FlinkInMemoryCatalog` and 
`HiveCatalog`.
+execution:
+   ...
+   current-catalog: catalog_1
+   current-database: mydb1
+{% endhighlight %}
 
 For more information about catalog, see [Catalogs]({{ site.baseurl 
}}/dev/table/catalog.html).
 
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh 
b/flink-end-to-end-tests/run-nightly-tests.sh
index 653803a..b8a1a40 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -146,7 +146,8 @@ run_test "Avro Confluent Schema Registry nightly end-to-end 
test" "$END_TO_END_D
 run_test "State TTL Heap backend end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh file"
 run_test "State TTL RocksDb backend end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh rocks"
 
-run_test "SQL Client end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_sql_client.sh"
+run_test "SQL Client end-to-end test (Old planner)" 
"$END_TO_END_DIR/test-scripts/test_sql_client.sh old"
+run_test "SQL Client end-to-end test (Blink planner)" 
"$END_TO_END_DIR/test-scripts/test_sql_client.sh blink"
 run_test "SQL Client end-to-end test for Kafka 0.10" 
"$END_TO_END_DIR/test-scripts/test_sql_client_kafka010.sh"
 run_test "SQL Client end-to-end test for Kafka 0.11" 
"$END_TO_END_DIR/test-scripts/test_sql_client_kafka011.sh"
 run_test "SQL Client end-to-end test for modern Kafka" 
"$END_TO_END_DIR/test-scripts/test_sql_client_kafka.sh"
diff --git a/flink-end-to-end-tests/test-scripts/test_sql_client.sh 
b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
index 3503054..5798545 100755
--- a/flink-end-to-end-tests/test-scripts/test_sql_client.sh
+++ b/flink-end-to-end-tests/test-scripts/test_sql_client.sh
@@ -19,6 +19,8 @@
 
 set -Eeuo pipefail
 
+PLANNER="${1:-old}"
+
 KAFKA_VERSION="2.2.0"
 CONFLUENT_VERSION="5.0.0"
 CONFLUENT_MAJOR_VERSION="5.0"
@@ -185,6 +187,9 @@ functions:
   - name: RegReplace
     from: class
     class: org.apache.flink.table.toolbox.StringRegexReplaceFunction
+
+execution:
+  planner: "$PLANNER"
 EOF
 
 # submit SQL statements
@@ -194,7 +199,7 @@ echo "Executing SQL: Values -> Elasticsearch (upsert)"
 SQL_STATEMENT_3=$(cat << EOF
 INSERT INTO ElasticsearchUpsertSinkTable
   SELECT user_id, user_name, COUNT(*) AS user_count
-  FROM (VALUES (1, 'Bob'), (22, 'Alice'), (42, 'Greg'), (42, 'Greg'), (42, 
'Greg'), (1, 'Bob'))
+  FROM (VALUES (1, 'Bob'), (22, 'Tom'), (42, 'Kim'), (42, 'Kim'), (42, 'Kim'), 
(1, 'Bob'))
     AS UserCountTable(user_id, user_name)
   GROUP BY user_id, user_name
 EOF
@@ -208,7 +213,7 @@ JOB_ID=$($FLINK_DIR/bin/sql-client.sh embedded \
 
 wait_job_terminal_state "$JOB_ID" "FINISHED"
 
-verify_result_hash "SQL Client Elasticsearch Upsert" "$ELASTICSEARCH_INDEX" 3 
"982cb32908def9801e781381c1b8a8db"
+verify_result_hash "SQL Client Elasticsearch Upsert" "$ELASTICSEARCH_INDEX" 3 
"21a76360e2a40f442816d940e7071ccf"
 
 echo "Executing SQL: Values -> Elasticsearch (append, no key)"
 
@@ -218,10 +223,10 @@ INSERT INTO ElasticsearchAppendSinkTable
   FROM (
     VALUES
       (1, 'Bob', CAST(0 AS BIGINT)),
-      (22, 'Alice', CAST(0 AS BIGINT)),
-      (42, 'Greg', CAST(0 AS BIGINT)),
-      (42, 'Greg', CAST(0 AS BIGINT)),
-      (42, 'Greg', CAST(0 AS BIGINT)),
+      (22, 'Tom', CAST(0 AS BIGINT)),
+      (42, 'Kim', CAST(0 AS BIGINT)),
+      (42, 'Kim', CAST(0 AS BIGINT)),
+      (42, 'Kim', CAST(0 AS BIGINT)),
       (1, 'Bob', CAST(0 AS BIGINT)))
     AS UserCountTable(user_id, user_name, user_count)
 EOF
diff --git a/flink-table/flink-sql-client/conf/sql-client-defaults.yaml 
b/flink-table/flink-sql-client/conf/sql-client-defaults.yaml
index 2ce3af6..db6a0ea 100644
--- a/flink-table/flink-sql-client/conf/sql-client-defaults.yaml
+++ b/flink-table/flink-sql-client/conf/sql-client-defaults.yaml
@@ -50,6 +50,7 @@ tables: [] # empty list
 #   time-attribute: ...
 #   primary-key: ...
 
+
 #==============================================================================
 # User-defined functions
 #==============================================================================
@@ -63,6 +64,21 @@ functions: [] # empty list
 #   class: ...
 #   constructor: ...
 
+
+#==============================================================================
+# Catalogs
+#==============================================================================
+
+# Define catalogs here.
+
+catalogs: [] # empty list
+# A typical catalog definition looks like:
+#  - name: myhive
+#    type: hive
+#    hive-conf-dir: /opt/hive_conf/
+#    default-database: ...
+
+
 #==============================================================================
 # Execution properties
 #==============================================================================
@@ -70,6 +86,9 @@ functions: [] # empty list
 # Execution properties allow for changing the behavior of a table program.
 
 execution:
+  # select the implementation responsible for planning table programs
+  # possible values are 'old' (used by default) or 'blink'
+  planner: old
   # 'batch' or 'streaming' execution
   type: streaming
   # allow 'event-time' or only 'processing-time' in sources
@@ -88,15 +107,15 @@ execution:
   min-idle-state-retention: 0
   # maximum idle state retention in ms
   max-idle-state-retention: 0
+  # current catalog ('default_catalog' by default)
+  current-catalog: default_catalog
+  # current database of the current catalog (default database of the catalog 
by default)
+  current-database: default_database
   # controls how table programs are restarted in case of a failures
   restart-strategy:
     # strategy type
     # possible values are "fixed-delay", "failure-rate", "none", or "fallback" 
(default)
     type: fallback
-  # current catalog of SQL Client
-#  current-catalog: ...
-  # current database of the current catalog
-#  current-database: ...
 
 
 #==============================================================================
@@ -113,16 +132,3 @@ deployment:
   gateway-address: ""
   # (optional) port from cluster to gateway
   gateway-port: 0
-
-
-#==============================================================================
-# Catalogs
-#==============================================================================
-
-# Define catalogs here.
-
-catalogs: [] # empty list
-#  - name: myhive
-#    type: hive
-#    hive-conf-dir: /opt/hive_conf/
-#    default-database: ...
diff --git a/flink-table/flink-sql-client/pom.xml 
b/flink-table/flink-sql-client/pom.xml
index f725ff0..d5e95fc 100644
--- a/flink-table/flink-sql-client/pom.xml
+++ b/flink-table/flink-sql-client/pom.xml
@@ -84,6 +84,18 @@ under the License.
                        <version>${project.version}</version>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
                <!-- logging utilities -->
                <dependency>
                        <groupId>org.slf4j</groupId>
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
index 80d3efb..a65642b 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ExecutionEntry.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.client.config.entries;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.client.config.ConfigUtil;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.descriptors.DescriptorProperties;
@@ -49,6 +50,12 @@ public class ExecutionEntry extends ConfigEntry {
        public static final ExecutionEntry DEFAULT_INSTANCE =
                new ExecutionEntry(new DescriptorProperties(true));
 
+       private static final String EXECUTION_PLANNER = "planner";
+
+       public static final String EXECUTION_PLANNER_VALUE_OLD = "old";
+
+       public static final String EXECUTION_PLANNER_VALUE_BLINK = "blink";
+
        private static final String EXECUTION_TYPE = "type";
 
        private static final String EXECUTION_TYPE_VALUE_STREAMING = 
"streaming";
@@ -97,9 +104,9 @@ public class ExecutionEntry extends ConfigEntry {
 
        private static final String 
EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL = 
"restart-strategy.max-failures-per-interval";
 
-       public static final String EXECUTION_CURRNET_CATALOG = 
"current-catalog";
+       public static final String EXECUTION_CURRENT_CATALOG = 
"current-catalog";
 
-       public static final String EXECUTION_CURRNET_DATABASE = 
"current-database";
+       public static final String EXECUTION_CURRENT_DATABASE = 
"current-database";
 
        private ExecutionEntry(DescriptorProperties properties) {
                super(properties);
@@ -108,6 +115,12 @@ public class ExecutionEntry extends ConfigEntry {
        @Override
        protected void validate(DescriptorProperties properties) {
                properties.validateEnumValues(
+                       EXECUTION_PLANNER,
+                       true,
+                       Arrays.asList(
+                               EXECUTION_PLANNER_VALUE_OLD,
+                               EXECUTION_PLANNER_VALUE_BLINK));
+               properties.validateEnumValues(
                        EXECUTION_TYPE,
                        true,
                        Arrays.asList(
@@ -137,20 +150,73 @@ public class ExecutionEntry extends ConfigEntry {
                properties.validateLong(EXECUTION_RESTART_STRATEGY_DELAY, true, 
0);
                
properties.validateLong(EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL, true, 
1);
                
properties.validateInt(EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL, 
true, 1);
-               properties.validateString(EXECUTION_CURRNET_CATALOG, true, 1);
-               properties.validateString(EXECUTION_CURRNET_DATABASE, true, 1);
+               properties.validateString(EXECUTION_CURRENT_CATALOG, true, 1);
+               properties.validateString(EXECUTION_CURRENT_DATABASE, true, 1);
+       }
+
+       public EnvironmentSettings getEnvironmentSettings() {
+               final EnvironmentSettings.Builder builder = 
EnvironmentSettings.newInstance();
+
+               if (inStreamingMode()) {
+                       builder.inStreamingMode();
+               } else if (inBatchMode()) {
+                       builder.inBatchMode();
+               }
+
+               final String planner = 
properties.getOptionalString(EXECUTION_PLANNER)
+                       .orElse(EXECUTION_PLANNER_VALUE_OLD);
+
+               if (planner.equals(EXECUTION_PLANNER_VALUE_OLD)) {
+                       builder.useOldPlanner();
+               } else if (planner.equals(EXECUTION_PLANNER_VALUE_BLINK)) {
+                       builder.useBlinkPlanner();
+               }
+
+               return builder.build();
        }
 
-       public boolean isStreamingExecution() {
+       public boolean inStreamingMode() {
                return properties.getOptionalString(EXECUTION_TYPE)
-                       .map((v) -> v.equals(EXECUTION_TYPE_VALUE_STREAMING))
-                       .orElse(false);
+                               .map((v) -> 
v.equals(EXECUTION_TYPE_VALUE_STREAMING))
+                               .orElse(false);
        }
 
-       public boolean isBatchExecution() {
+       public boolean inBatchMode() {
                return properties.getOptionalString(EXECUTION_TYPE)
-                       .map((v) -> v.equals(EXECUTION_TYPE_VALUE_BATCH))
-                       .orElse(false);
+                               .map((v) -> 
v.equals(EXECUTION_TYPE_VALUE_BATCH))
+                               .orElse(false);
+       }
+
+       public boolean isStreamingPlanner() {
+               final String planner = 
properties.getOptionalString(EXECUTION_PLANNER)
+                       .orElse(EXECUTION_PLANNER_VALUE_OLD);
+
+               // Blink planner is a streaming planner
+               if (planner.equals(EXECUTION_PLANNER_VALUE_BLINK)) {
+                       return true;
+               }
+               // Old planner can be a streaming or batch planner
+               else if (planner.equals(EXECUTION_PLANNER_VALUE_OLD)) {
+                       return inStreamingMode();
+               }
+
+               return false;
+       }
+
+       public boolean isBatchPlanner() {
+               final String planner = 
properties.getOptionalString(EXECUTION_PLANNER)
+                       .orElse(EXECUTION_PLANNER_VALUE_OLD);
+
+               // Blink planner is not a batch planner
+               if (planner.equals(EXECUTION_PLANNER_VALUE_BLINK)) {
+                       return false;
+               }
+               // Old planner can be a streaming or batch planner
+               else if (planner.equals(EXECUTION_PLANNER_VALUE_OLD)) {
+                       return inBatchMode();
+               }
+
+               return false;
        }
 
        public TimeCharacteristic getTimeCharacteristic() {
@@ -237,11 +303,11 @@ public class ExecutionEntry extends ConfigEntry {
        }
 
        public Optional<String> getCurrentCatalog() {
-               return properties.getOptionalString(EXECUTION_CURRNET_CATALOG);
+               return properties.getOptionalString(EXECUTION_CURRENT_CATALOG);
        }
 
        public Optional<String> getCurrentDatabase() {
-               return properties.getOptionalString(EXECUTION_CURRNET_DATABASE);
+               return properties.getOptionalString(EXECUTION_CURRENT_DATABASE);
        }
 
        public boolean isChangelogMode() {
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
index d2a7da2..11f4224 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/SessionContext.java
@@ -79,23 +79,23 @@ public class SessionContext {
        }
 
        public Optional<String> getCurrentCatalog() {
-               return 
Optional.ofNullable(sessionProperties.get(ExecutionEntry.EXECUTION_CURRNET_CATALOG));
+               return 
Optional.ofNullable(sessionProperties.get(ExecutionEntry.EXECUTION_CURRENT_CATALOG));
        }
 
        public void setCurrentCatalog(String currentCatalog) {
                
checkArgument(!StringUtils.isNullOrWhitespaceOnly(currentCatalog));
 
-               sessionProperties.put(ExecutionEntry.EXECUTION_CURRNET_CATALOG, 
currentCatalog);
+               sessionProperties.put(ExecutionEntry.EXECUTION_CURRENT_CATALOG, 
currentCatalog);
        }
 
        public Optional<String> getCurrentDatabase() {
-               return 
Optional.ofNullable(sessionProperties.get(ExecutionEntry.EXECUTION_CURRNET_DATABASE));
+               return 
Optional.ofNullable(sessionProperties.get(ExecutionEntry.EXECUTION_CURRENT_DATABASE));
        }
 
        public void setCurrentDatabase(String currentDatabase) {
                
checkArgument(!StringUtils.isNullOrWhitespaceOnly(currentDatabase));
 
-               
sessionProperties.put(ExecutionEntry.EXECUTION_CURRNET_DATABASE, 
currentDatabase);
+               
sessionProperties.put(ExecutionEntry.EXECUTION_CURRENT_DATABASE, 
currentDatabase);
        }
 
        public Environment getEnvironment() {
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java
index 55f0038..9dccfb7 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectBatchTableSink.java
@@ -18,18 +18,20 @@
 
 package org.apache.flink.table.client.gateway.local;
 
+import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.table.sinks.BatchTableSink;
+import org.apache.flink.table.sinks.OutputFormatTableSink;
 import org.apache.flink.types.Row;
 
 /**
  * Table sink for collecting the results locally all at once using 
accumulators.
  */
-public class CollectBatchTableSink implements BatchTableSink<Row> {
+public class CollectBatchTableSink extends OutputFormatTableSink<Row> 
implements BatchTableSink<Row> {
 
        private final String accumulatorName;
        private final TypeSerializer<Row> serializer;
@@ -42,6 +44,13 @@ public class CollectBatchTableSink implements 
BatchTableSink<Row> {
                this.serializer = serializer;
        }
 
+       /**
+        * Returns the serializer for deserializing the collected result.
+        */
+       public TypeSerializer<Row> getSerializer() {
+               return serializer;
+       }
+
        @Override
        public TypeInformation<Row> getOutputType() {
                return Types.ROW_NAMED(fieldNames, fieldTypes);
@@ -72,10 +81,8 @@ public class CollectBatchTableSink implements 
BatchTableSink<Row> {
                        .name("SQL Client Batch Collect Sink");
        }
 
-       /**
-        * Returns the serializer for deserializing the collected result.
-        */
-       public TypeSerializer<Row> getSerializer() {
-               return serializer;
+       @Override
+       public OutputFormat<Row> getOutputFormat() {
+               return new Utils.CollectHelper<>(accumulatorName, serializer);
        }
 }
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java
index e36eb35..ce8565a 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/CollectStreamTableSink.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.experimental.CollectSink;
 import org.apache.flink.table.sinks.RetractStreamTableSink;
 import org.apache.flink.types.Row;
@@ -73,8 +74,13 @@ public class CollectStreamTableSink implements 
RetractStreamTableSink<Row> {
 
        @Override
        public void emitDataStream(DataStream<Tuple2<Boolean, Row>> stream) {
+               consumeDataStream(stream);
+       }
+
+       @Override
+       public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, 
Row>> stream) {
                // add sink
-               stream
+               return stream
                        .addSink(new CollectSink<>(targetAddress, targetPort, 
serializer))
                        .name("SQL Client Stream Collect Sink")
                        .setParallelism(1);
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index ae7fb2c..f212e98 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -39,13 +39,20 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.BatchQueryConfig;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.QueryConfig;
 import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
 import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.config.entries.DeploymentEntry;
 import org.apache.flink.table.client.config.entries.ExecutionEntry;
@@ -56,17 +63,23 @@ import 
org.apache.flink.table.client.config.entries.TemporalTableEntry;
 import org.apache.flink.table.client.config.entries.ViewEntry;
 import org.apache.flink.table.client.gateway.SessionContext;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
 import org.apache.flink.table.factories.BatchTableSinkFactory;
 import org.apache.flink.table.factories.BatchTableSourceFactory;
 import org.apache.flink.table.factories.CatalogFactory;
-import org.apache.flink.table.factories.StreamTableSinkFactory;
-import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.factories.ComponentFactoryService;
 import org.apache.flink.table.factories.TableFactoryService;
+import org.apache.flink.table.factories.TableSinkFactory;
+import org.apache.flink.table.factories.TableSourceFactory;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.FunctionService;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.planner.delegation.ExecutorBase;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.util.FlinkException;
@@ -74,6 +87,7 @@ import org.apache.flink.util.FlinkException;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
 
+import java.lang.reflect.Method;
 import java.net.URL;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -246,11 +260,11 @@ public class ExecutionContext<T> {
        }
 
        private static TableSource<?> createTableSource(ExecutionEntry 
execution, Map<String, String> sourceProperties, ClassLoader classLoader) {
-               if (execution.isStreamingExecution()) {
-                       final StreamTableSourceFactory<?> factory = 
(StreamTableSourceFactory<?>)
-                               
TableFactoryService.find(StreamTableSourceFactory.class, sourceProperties, 
classLoader);
-                       return 
factory.createStreamTableSource(sourceProperties);
-               } else if (execution.isBatchExecution()) {
+               if (execution.isStreamingPlanner()) {
+                       final TableSourceFactory<?> factory = 
(TableSourceFactory<?>)
+                               
TableFactoryService.find(TableSourceFactory.class, sourceProperties, 
classLoader);
+                       return factory.createTableSource(sourceProperties);
+               } else if (execution.isBatchPlanner()) {
                        final BatchTableSourceFactory<?> factory = 
(BatchTableSourceFactory<?>)
                                
TableFactoryService.find(BatchTableSourceFactory.class, sourceProperties, 
classLoader);
                        return factory.createBatchTableSource(sourceProperties);
@@ -259,11 +273,11 @@ public class ExecutionContext<T> {
        }
 
        private static TableSink<?> createTableSink(ExecutionEntry execution, 
Map<String, String> sinkProperties, ClassLoader classLoader) {
-               if (execution.isStreamingExecution()) {
-                       final StreamTableSinkFactory<?> factory = 
(StreamTableSinkFactory<?>)
-                               
TableFactoryService.find(StreamTableSinkFactory.class, sinkProperties, 
classLoader);
-                       return factory.createStreamTableSink(sinkProperties);
-               } else if (execution.isBatchExecution()) {
+               if (execution.isStreamingPlanner()) {
+                       final TableSinkFactory<?> factory = 
(TableSinkFactory<?>)
+                               
TableFactoryService.find(TableSinkFactory.class, sinkProperties, classLoader);
+                       return factory.createTableSink(sinkProperties);
+               } else if (execution.isBatchPlanner()) {
                        final BatchTableSinkFactory<?> factory = 
(BatchTableSinkFactory<?>)
                                
TableFactoryService.find(BatchTableSinkFactory.class, sinkProperties, 
classLoader);
                        return factory.createBatchTableSink(sinkProperties);
@@ -271,6 +285,53 @@ public class ExecutionContext<T> {
                throw new SqlExecutionException("Unsupported execution type for 
sinks.");
        }
 
+       private static TableEnvironment createStreamTableEnvironment(
+                       StreamExecutionEnvironment env,
+                       EnvironmentSettings settings,
+                       Executor executor) {
+
+               final TableConfig config = TableConfig.getDefault();
+
+               final CatalogManager catalogManager = new CatalogManager(
+                       settings.getBuiltInCatalogName(),
+                       new 
GenericInMemoryCatalog(settings.getBuiltInCatalogName(), 
settings.getBuiltInDatabaseName()));
+
+               final FunctionCatalog functionCatalog = new 
FunctionCatalog(catalogManager);
+
+               final Map<String, String> plannerProperties = 
settings.toPlannerProperties();
+               final Planner planner = 
ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+                       .create(plannerProperties, executor, config, 
functionCatalog, catalogManager);
+
+               return new StreamTableEnvironmentImpl(
+                       catalogManager,
+                       functionCatalog,
+                       config,
+                       env,
+                       planner,
+                       executor,
+                       settings.isStreamingMode()
+               );
+       }
+
+       private static Executor lookupExecutor(
+                       Map<String, String> executorProperties,
+                       StreamExecutionEnvironment executionEnvironment) {
+               try {
+                       ExecutorFactory executorFactory = 
ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
+                       Method createMethod = executorFactory.getClass()
+                               .getMethod("create", Map.class, 
StreamExecutionEnvironment.class);
+
+                       return (Executor) createMethod.invoke(
+                               executorFactory,
+                               executorProperties,
+                               executionEnvironment);
+               } catch (Exception e) {
+                       throw new TableException(
+                               "Could not instantiate the executor. Make sure 
a planner module is on the classpath",
+                               e);
+               }
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
        /**
@@ -283,17 +344,25 @@ public class ExecutionContext<T> {
                private final QueryConfig queryConfig;
                private final ExecutionEnvironment execEnv;
                private final StreamExecutionEnvironment streamExecEnv;
+               private final Executor executor;
                private final TableEnvironment tableEnv;
 
                private EnvironmentInstance() {
+                       // create settings
+                       final EnvironmentSettings settings = 
mergedEnv.getExecution().getEnvironmentSettings();
+
                        // create environments
-                       if (mergedEnv.getExecution().isStreamingExecution()) {
+                       if (mergedEnv.getExecution().isStreamingPlanner()) {
                                streamExecEnv = 
createStreamExecutionEnvironment();
                                execEnv = null;
-                               tableEnv = 
StreamTableEnvironment.create(streamExecEnv);
-                       } else if (mergedEnv.getExecution().isBatchExecution()) 
{
+
+                               final Map<String, String> executorProperties = 
settings.toExecutorProperties();
+                               executor = lookupExecutor(executorProperties, 
streamExecEnv);
+                               tableEnv = 
createStreamTableEnvironment(streamExecEnv, settings, executor);
+                       } else if (mergedEnv.getExecution().isBatchPlanner()) {
                                streamExecEnv = null;
                                execEnv = createExecutionEnvironment();
+                               executor = null;
                                tableEnv = 
BatchTableEnvironment.create(execEnv);
                        } else {
                                throw new SqlExecutionException("Unsupported 
execution type specified.");
@@ -378,6 +447,11 @@ public class ExecutionContext<T> {
 
                private FlinkPlan createPlan(String name, Configuration 
flinkConfig) {
                        if (streamExecEnv != null) {
+                               // special case for Blink planner to apply 
batch optimizations
+                               // note: it also modifies the ExecutionConfig!
+                               if (executor instanceof ExecutorBase) {
+                                       return ((ExecutorBase) 
executor).generateStreamGraph(name);
+                               }
                                return streamExecEnv.getStreamGraph(name);
                        } else {
                                final int parallelism = 
execEnv.getParallelism();
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 101f72c..5d91964 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -40,7 +40,6 @@ import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.internal.TableEnvImpl;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.calcite.FlinkTypeFactory;
 import org.apache.flink.table.client.SqlClientException;
@@ -302,7 +301,7 @@ public class LocalExecutor implements Executor {
 
                try {
                        return context.wrapClassLoader(() ->
-                               Arrays.asList(((TableEnvImpl) 
tableEnv).getCompletionHints(statement, position)));
+                               
Arrays.asList(tableEnv.getCompletionHints(statement, position)));
                } catch (Throwable t) {
                        // catch everything such that the query does not crash 
the executor
                        if (LOG.isDebugEnabled()) {
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
index dba7ed6..c3cc12b 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ResultStore.java
@@ -61,7 +61,7 @@ public class ResultStore {
 
                final RowTypeInfo outputType = new 
RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames());
 
-               if (env.getExecution().isStreamingExecution()) {
+               if (env.getExecution().inStreamingMode()) {
                        // determine gateway address (and port if possible)
                        final InetAddress gatewayAddress = 
getGatewayAddress(env.getDeployment());
                        final int gatewayPort = 
getGatewayPort(env.getDeployment());
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
index 4cddb53..2ad69e4 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
@@ -51,6 +51,7 @@ public class EnvironmentTest {
        @Test
        public void testMerging() throws Exception {
                final Map<String, String> replaceVars1 = new HashMap<>();
+               replaceVars1.put("$VAR_PLANNER", "old");
                replaceVars1.put("$VAR_EXECUTION_TYPE", "batch");
                replaceVars1.put("$VAR_RESULT_MODE", "table");
                replaceVars1.put("$VAR_UPDATE_MODE", "");
@@ -76,7 +77,7 @@ public class EnvironmentTest {
                tables.add("TestView2");
 
                assertEquals(tables, merged.getTables().keySet());
-               assertTrue(merged.getExecution().isStreamingExecution());
+               assertTrue(merged.getExecution().inStreamingMode());
                assertEquals(16, merged.getExecution().getMaxParallelism());
        }
 
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
index fb0c80c..aaa8321 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
@@ -247,6 +247,7 @@ public class ExecutionContextTest {
 
        private <T> ExecutionContext<T> createDefaultExecutionContext() throws 
Exception {
                final Map<String, String> replaceVars = new HashMap<>();
+               replaceVars.put("$VAR_PLANNER", "old");
                replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
                replaceVars.put("$VAR_RESULT_MODE", "changelog");
                replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
@@ -256,6 +257,7 @@ public class ExecutionContextTest {
 
        private <T> ExecutionContext<T> createCatalogExecutionContext() throws 
Exception {
                final Map<String, String> replaceVars = new HashMap<>();
+               replaceVars.put("$VAR_PLANNER", "old");
                replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
                replaceVars.put("$VAR_RESULT_MODE", "changelog");
                replaceVars.put("$VAR_UPDATE_MODE", "update-mode: append");
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 5dbec41..838b95a 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.client.config.Environment;
+import org.apache.flink.table.client.config.entries.ExecutionEntry;
 import org.apache.flink.table.client.config.entries.ViewEntry;
 import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.ProgramTargetDescriptor;
@@ -52,6 +53,10 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
 
 import java.io.File;
 import java.io.IOException;
@@ -75,8 +80,16 @@ import static org.junit.Assert.fail;
 /**
  * Contains basic tests for the {@link LocalExecutor}.
  */
+@RunWith(Parameterized.class)
 public class LocalExecutorITCase extends TestLogger {
 
+       @Parameters(name = "Planner: {0}")
+       public static List<String> planner() {
+               return Arrays.asList(
+                       ExecutionEntry.EXECUTION_PLANNER_VALUE_OLD,
+                       ExecutionEntry.EXECUTION_PLANNER_VALUE_BLINK);
+       }
+
        private static final String DEFAULTS_ENVIRONMENT_FILE = 
"test-sql-client-defaults.yaml";
        private static final String CATALOGS_ENVIRONMENT_FILE = 
"test-sql-client-catalogs.yaml";
 
@@ -110,6 +123,9 @@ public class LocalExecutorITCase extends TestLogger {
                return config;
        }
 
+       @Parameter
+       public String planner;
+
        @Test
        public void testValidateSession() throws Exception {
                final Executor executor = createDefaultExecutor(clusterClient);
@@ -174,7 +190,7 @@ public class LocalExecutorITCase extends TestLogger {
 
                final List<String> actualDatabases = 
executor.listDatabases(session);
 
-               final List<String> expectedDatabases = 
Arrays.asList("default_database");
+               final List<String> expectedDatabases = 
Collections.singletonList("default_database");
                assertEquals(expectedDatabases, actualDatabases);
        }
 
@@ -220,6 +236,7 @@ public class LocalExecutorITCase extends TestLogger {
                final Map<String, String> actualProperties = 
executor.getSessionProperties(session);
 
                final Map<String, String> expectedProperties = new HashMap<>();
+               expectedProperties.put("execution.planner", planner);
                expectedProperties.put("execution.type", "batch");
                expectedProperties.put("execution.time-characteristic", 
"event-time");
                
expectedProperties.put("execution.periodic-watermarks-interval", "99");
@@ -275,6 +292,7 @@ public class LocalExecutorITCase extends TestLogger {
                final URL url = 
getClass().getClassLoader().getResource("test-data.csv");
                Objects.requireNonNull(url);
                final Map<String, String> replaceVars = new HashMap<>();
+               replaceVars.put("$VAR_PLANNER", planner);
                replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
                replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
                replaceVars.put("$VAR_RESULT_MODE", "changelog");
@@ -315,6 +333,7 @@ public class LocalExecutorITCase extends TestLogger {
                Objects.requireNonNull(url);
 
                final Map<String, String> replaceVars = new HashMap<>();
+               replaceVars.put("$VAR_PLANNER", planner);
                replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
                replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
                replaceVars.put("$VAR_RESULT_MODE", "table");
@@ -340,6 +359,7 @@ public class LocalExecutorITCase extends TestLogger {
                Objects.requireNonNull(url);
 
                final Map<String, String> replaceVars = new HashMap<>();
+               replaceVars.put("$VAR_PLANNER", planner);
                replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
                replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
                replaceVars.put("$VAR_RESULT_MODE", "table");
@@ -359,6 +379,7 @@ public class LocalExecutorITCase extends TestLogger {
                final URL url = 
getClass().getClassLoader().getResource("test-data.csv");
                Objects.requireNonNull(url);
                final Map<String, String> replaceVars = new HashMap<>();
+               replaceVars.put("$VAR_PLANNER", planner);
                replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
                replaceVars.put("$VAR_EXECUTION_TYPE", "batch");
                replaceVars.put("$VAR_RESULT_MODE", "table");
@@ -395,6 +416,7 @@ public class LocalExecutorITCase extends TestLogger {
                final URL url = 
getClass().getClassLoader().getResource("test-data.csv");
                Objects.requireNonNull(url);
                final Map<String, String> replaceVars = new HashMap<>();
+               replaceVars.put("$VAR_PLANNER", planner);
                replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
                replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
                replaceVars.put("$VAR_SOURCE_SINK_PATH", csvOutputPath);
@@ -454,6 +476,7 @@ public class LocalExecutorITCase extends TestLogger {
                final URL url = 
getClass().getClassLoader().getResource("test-data.csv");
                Objects.requireNonNull(url);
                final Map<String, String> replaceVars = new HashMap<>();
+               replaceVars.put("$VAR_PLANNER", planner);
                replaceVars.put("$VAR_SOURCE_PATH1", url.getPath());
                replaceVars.put("$VAR_EXECUTION_TYPE", "streaming");
                replaceVars.put("$VAR_SOURCE_SINK_PATH", csvOutputPath);
@@ -519,6 +542,7 @@ public class LocalExecutorITCase extends TestLogger {
 
        private <T> LocalExecutor createDefaultExecutor(ClusterClient<T> 
clusterClient) throws Exception {
                final Map<String, String> replaceVars = new HashMap<>();
+               replaceVars.put("$VAR_PLANNER", planner);
                replaceVars.put("$VAR_EXECUTION_TYPE", "batch");
                replaceVars.put("$VAR_UPDATE_MODE", "");
                replaceVars.put("$VAR_MAX_ROWS", "100");
diff --git 
a/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml 
b/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
index 9844d54..d3e917d 100644
--- 
a/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
+++ 
b/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
@@ -115,7 +115,15 @@ functions:
       - type: LONG
         value: 5
 
+catalogs:
+  - name: catalog1
+    type: DependencyTest
+  - name: simple-catalog
+    type: simple-catalog
+    test-table: test-table
+
 execution:
+  planner: "$VAR_PLANNER"
   type: "$VAR_EXECUTION_TYPE"
   time-characteristic: event-time
   periodic-watermarks-interval: 99
@@ -133,10 +141,3 @@ execution:
 
 deployment:
   response-timeout: 5000
-
-catalogs:
-  - name: catalog1
-    type: DependencyTest
-  - name: simple-catalog
-    type: simple-catalog
-    test-table: test-table
diff --git a/tools/travis/splits/split_misc.sh 
b/tools/travis/splits/split_misc.sh
index 35c8465..60eb91b 100755
--- a/tools/travis/splits/split_misc.sh
+++ b/tools/travis/splits/split_misc.sh
@@ -67,7 +67,8 @@ run_test "Avro Confluent Schema Registry nightly end-to-end 
test" "$END_TO_END_D
 run_test "State TTL Heap backend end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh file"
 run_test "State TTL RocksDb backend end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_stream_state_ttl.sh rocks"
 
-run_test "SQL Client end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_sql_client.sh"
+run_test "SQL Client end-to-end test (Old planner)" 
"$END_TO_END_DIR/test-scripts/test_sql_client.sh old"
+run_test "SQL Client end-to-end test (Blink planner)" 
"$END_TO_END_DIR/test-scripts/test_sql_client.sh blink"
 run_test "SQL Client end-to-end test for Kafka 0.10" 
"$END_TO_END_DIR/test-scripts/test_sql_client_kafka010.sh"
 run_test "SQL Client end-to-end test for Kafka 0.11" 
"$END_TO_END_DIR/test-scripts/test_sql_client_kafka011.sh"
 run_test "SQL Client end-to-end test for modern Kafka" 
"$END_TO_END_DIR/test-scripts/test_sql_client_kafka.sh"

Reply via email to