This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 44009efc540 [FLINK-29219][table] Fix CREATE TABLE AS statement blocks 
SQL client's execution
44009efc540 is described below

commit 44009efc5400c6706563fa53335eccf445cf5d0c
Author: Ron <ldliu...@163.com>
AuthorDate: Tue Sep 27 09:52:14 2022 +0800

    [FLINK-29219][table] Fix CREATE TABLE AS statement blocks SQL client's 
execution
    
    This closes #20869
---
 .../hive/parse/HiveParserDDLSemanticAnalyzer.java  |  2 +-
 .../table/sql/codegen/CreateTableAsITCase.java     | 68 ++++++++++++++++++++++
 .../src/test/resources/create_table_as_e2e.sql     | 36 ++++++++++++
 .../resources/create_table_as_statementset_e2e.sql | 40 +++++++++++++
 .../apache/flink/table/client/cli/CliClient.java   |  9 ++-
 .../flink-sql-client/src/test/resources/sql/set.q  | 21 ++++---
 .../gateway/AbstractSqlGatewayStatementITCase.java |  3 +
 .../src/test/resources/sql/insert.q                | 53 +++++++++++++++++
 .../table/api/internal/TableEnvironmentImpl.java   | 26 ++++++---
 .../{ddl => }/CreateTableASOperation.java          | 21 ++++---
 .../table/operations/ModifyOperationVisitor.java   |  2 +
 .../operations/SqlCreateTableConverter.java        |  2 +-
 .../apache/flink/table/api/CompiledPlanITCase.java | 24 ++++++++
 .../planner/plan/stream/sql/TableSinkTest.scala    |  4 +-
 .../runtime/batch/sql/TableSinkITCase.scala        | 16 +++++
 .../runtime/stream/sql/TableSinkITCase.scala       | 17 ++++++
 16 files changed, 316 insertions(+), 28 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
index 3ef4f1ba879..799cd09ad6d 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
@@ -60,6 +60,7 @@ import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.hive.HiveFunctionWrapper;
 import org.apache.flink.table.functions.hive.HiveGenericUDF;
+import org.apache.flink.table.operations.CreateTableASOperation;
 import org.apache.flink.table.operations.DescribeTableOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.QueryOperation;
@@ -80,7 +81,6 @@ import 
org.apache.flink.table.operations.ddl.AlterViewPropertiesOperation;
 import org.apache.flink.table.operations.ddl.AlterViewRenameOperation;
 import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
-import org.apache.flink.table.operations.ddl.CreateTableASOperation;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
 import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.CreateViewOperation;
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/CreateTableAsITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/CreateTableAsITCase.java
new file mode 100644
index 00000000000..1204842547b
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/CreateTableAsITCase.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sql.codegen;
+
+import org.apache.flink.test.util.SQLJobSubmission;
+import org.apache.flink.tests.util.flink.ClusterController;
+
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+
+/** End to End tests for create table as select syntax. */
+public class CreateTableAsITCase extends SqlITCaseBase {
+
+    public CreateTableAsITCase(String executionMode) {
+        super(executionMode);
+    }
+
+    @Test
+    public void testCreateTableAs() throws Exception {
+        runAndCheckSQL(
+                "create_table_as_e2e.sql",
+                generateReplaceVars(),
+                2,
+                Arrays.asList(
+                        
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
+                        
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+    }
+
+    @Test
+    public void testCreateTableAsInStatementSet() throws Exception {
+        runAndCheckSQL(
+                "create_table_as_statementset_e2e.sql",
+                generateReplaceVars(),
+                2,
+                Arrays.asList(
+                        
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
+                        
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
+    }
+
+    @Override
+    protected void executeSqlStatements(ClusterController clusterController, 
List<String> sqlLines)
+            throws Exception {
+        clusterController.submitSQLJob(
+                new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
+                        .addJar(SQL_TOOL_BOX_JAR)
+                        .build(),
+                Duration.ofMinutes(2L));
+    }
+}
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/create_table_as_e2e.sql
 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/create_table_as_e2e.sql
new file mode 100644
index 00000000000..b9316e1096a
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/create_table_as_e2e.sql
@@ -0,0 +1,36 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+CREATE TEMPORARY FUNCTION count_agg AS 
'org.apache.flink.table.toolbox.CountAggFunction';
+
+SET execution.runtime-mode = $MODE;
+SET table.exec.mini-batch.enabled = true;
+SET table.exec.mini-batch.size = 5;
+SET table.exec.mini-batch.allow-latency = 2s;
+
+CREATE TABLE JsonTable
+WITH (
+  'connector' = 'filesystem',
+  'path' = '$RESULT',
+  'sink.rolling-policy.rollover-interval' = '2s',
+  'sink.rolling-policy.check-interval' = '2s',
+  'format' = 'debezium-json'
+)
+AS SELECT user_name, count_agg(order_id) AS order_cnt
+FROM (VALUES (1, 'Bob'), (2, 'Bob'), (1, 'Alice')) T(order_id, user_name)
+GROUP BY user_name;
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/create_table_as_statementset_e2e.sql
 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/create_table_as_statementset_e2e.sql
new file mode 100644
index 00000000000..deedf8a0eb3
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/create_table_as_statementset_e2e.sql
@@ -0,0 +1,40 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+CREATE TEMPORARY FUNCTION count_agg AS 
'org.apache.flink.table.toolbox.CountAggFunction';
+
+SET execution.runtime-mode = $MODE;
+SET table.exec.mini-batch.enabled = true;
+SET table.exec.mini-batch.size = 5;
+SET table.exec.mini-batch.allow-latency = 2s;
+
+BEGIN STATEMENT SET;
+
+CREATE TABLE JsonTable
+WITH (
+    'connector' = 'filesystem',
+    'path' = '$RESULT',
+    'sink.rolling-policy.rollover-interval' = '2s',
+    'sink.rolling-policy.check-interval' = '2s',
+    'format' = 'debezium-json'
+)
+AS SELECT user_name, count_agg(order_id) AS order_cnt
+   FROM (VALUES (1, 'Bob'), (2, 'Bob'), (1, 'Alice')) T(order_id, user_name)
+   GROUP BY user_name;
+
+END;
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index 1bc2b84caf0..f6ade74ae70 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.ResultDescriptor;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
 import org.apache.flink.table.operations.BeginStatementSetOperation;
+import org.apache.flink.table.operations.CreateTableASOperation;
 import org.apache.flink.table.operations.EndStatementSetOperation;
 import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.LoadModuleOperation;
@@ -410,7 +411,8 @@ public class CliClient implements AutoCloseable {
         // check the current operation is allowed in STATEMENT SET.
         if (isStatementSetMode) {
             if (!(operation instanceof SinkModifyOperation
-                    || operation instanceof EndStatementSetOperation)) {
+                    || operation instanceof EndStatementSetOperation
+                    || operation instanceof CreateTableASOperation)) {
                 // It's up to invoker of the executeStatement to determine 
whether to continue
                 // execution
                 throw new 
SqlExecutionException(MESSAGE_STATEMENT_SET_SQL_EXECUTION_ERROR);
@@ -463,6 +465,9 @@ public class CliClient implements AutoCloseable {
         } else if (operation instanceof ShowCreateViewOperation) {
             // SHOW CREATE VIEW
             callShowCreateView((ShowCreateViewOperation) operation);
+        } else if (operation instanceof CreateTableASOperation) {
+            // CTAS
+            callInsert((CreateTableASOperation) operation);
         } else {
             // fallback to default implementation
             executeOperation(operation);
@@ -557,7 +562,7 @@ public class CliClient implements AutoCloseable {
         }
     }
 
-    private void callInsert(SinkModifyOperation operation) {
+    private void callInsert(ModifyOperation operation) {
         if (isStatementSetMode) {
             statementSetOperations.add(operation);
             printInfo(CliStrings.MESSAGE_ADD_STATEMENT_TO_STATEMENT_SET);
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/set.q 
b/flink-table/flink-sql-client/src/test/resources/sql/set.q
index a450c4e9b42..58172fe4912 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/set.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/set.q
@@ -74,14 +74,21 @@ CREATE TABLE hive_table (
 [INFO] Execute statement succeed.
 !info
 
-# test "ctas" only supported in Hive Dialect
+# test "ctas" in Hive Dialect
 CREATE TABLE foo as select 1;
-+-------------------------+
-| hivecatalog.default.foo |
-+-------------------------+
-|                      -1 |
-+-------------------------+
-1 row in set
+[INFO] Submitting SQL update statement to the cluster...
+[INFO] SQL update statement has been successfully submitted to the cluster:
+Job ID:
+
+!info
+
+SELECT * from foo;
++----+-------------+
+| op |      _o__c0 |
++----+-------------+
+| +I |           1 |
++----+-------------+
+Received a total of 1 row
 !ok
 
 # test add jar
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
index 682ff67bc1a..168f1e8674f 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
@@ -103,6 +103,9 @@ public abstract class AbstractSqlGatewayStatementITCase 
extends AbstractTestBase
         replaceVars.put(
                 "$VAR_BATCH_PATH",
                 
Files.createDirectory(temporaryFolder.resolve("batch")).toFile().getPath());
+        replaceVars.put(
+                "$VAR_BATCH_CTAS_PATH",
+                
Files.createDirectory(temporaryFolder.resolve("batch_ctas")).toFile().getPath());
     }
 
     @ParameterizedTest
diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q 
b/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q
index 2348b1f57a0..295b822cff0 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q
@@ -180,3 +180,56 @@ SELECT * FROM BatchTable;
 +----+-------------+
 7 rows in set
 !ok
+
+# test only to verify the test job id.
+SET '$internal.pipeline.job-id' = '84c7408a08c284d5736e50d3f5a648be';
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+CREATE TABLE CtasTable
+WITH (
+  'connector' = 'filesystem',
+  'path' = '$VAR_BATCH_CTAS_PATH',
+  'format' = 'csv'
+)
+AS SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 
'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE')) T(id, str);
+!output
++----------------------------------+
+|                           job id |
++----------------------------------+
+| 84c7408a08c284d5736e50d3f5a648be |
++----------------------------------+
+1 row in set
+!ok
+
+RESET '$internal.pipeline.job-id';
+!output
++--------+
+| result |
++--------+
+|     OK |
++--------+
+1 row in set
+!ok
+
+SELECT * FROM CtasTable;
+!output
++----+-------------+
+| id |         str |
++----+-------------+
+|  1 | Hello World |
+|  2 |          Hi |
+|  2 |          Hi |
+|  3 |       Hello |
+|  3 |       World |
+|  4 |         ADD |
+|  5 |        LINE |
++----+-------------+
+7 rows in set
+!ok
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index c61cdb05fd6..caf1175c680 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -90,6 +90,7 @@ import org.apache.flink.table.module.ModuleEntry;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.operations.CollectModifyOperation;
 import org.apache.flink.table.operations.CompileAndExecutePlanOperation;
+import org.apache.flink.table.operations.CreateTableASOperation;
 import org.apache.flink.table.operations.DescribeTableOperation;
 import org.apache.flink.table.operations.ExplainOperation;
 import org.apache.flink.table.operations.LoadModuleOperation;
@@ -139,7 +140,6 @@ import 
org.apache.flink.table.operations.ddl.CompilePlanOperation;
 import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
 import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
-import org.apache.flink.table.operations.ddl.CreateTableASOperation;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
 import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.CreateViewOperation;
@@ -779,7 +779,9 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
     public CompiledPlan compilePlanSql(String stmt) {
         List<Operation> operations = getParser().parse(stmt);
 
-        if (operations.size() != 1 || !(operations.get(0) instanceof 
ModifyOperation)) {
+        if (operations.size() != 1
+                || !(operations.get(0) instanceof ModifyOperation)
+                || operations.get(0) instanceof CreateTableASOperation) {
             throw new 
TableException(UNSUPPORTED_QUERY_IN_COMPILE_PLAN_SQL_MSG);
         }
 
@@ -839,8 +841,20 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
 
     @Override
     public TableResultInternal executeInternal(List<ModifyOperation> 
operations) {
-        List<Transformation<?>> transformations = translate(operations);
-        List<String> sinkIdentifierNames = 
extractSinkIdentifierNames(operations);
+        List<ModifyOperation> mapOperations = new ArrayList<>();
+        for (ModifyOperation modify : operations) {
+            // execute CREATE TABLE first for CTAS statements
+            if (modify instanceof CreateTableASOperation) {
+                CreateTableASOperation ctasOperation = 
(CreateTableASOperation) modify;
+                executeInternal(ctasOperation.getCreateTableOperation());
+                
mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager));
+            } else {
+                mapOperations.add(modify);
+            }
+        }
+
+        List<Transformation<?>> transformations = translate(mapOperations);
+        List<String> sinkIdentifierNames = 
extractSinkIdentifierNames(mapOperations);
         TableResultInternal result = executeInternal(transformations, 
sinkIdentifierNames);
         if (tableConfig.get(TABLE_DML_SYNC)) {
             try {
@@ -1397,10 +1411,6 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
             }
         } else if (operation instanceof QueryOperation) {
             return executeQueryOperation((QueryOperation) operation);
-        } else if (operation instanceof CreateTableASOperation) {
-            CreateTableASOperation createTableASOperation = 
(CreateTableASOperation) operation;
-            executeInternal(createTableASOperation.getCreateTableOperation());
-            return 
executeInternal(createTableASOperation.toSinkModifyOperation(catalogManager));
         } else if (operation instanceof ExecutePlanOperation) {
             ExecutePlanOperation executePlanOperation = (ExecutePlanOperation) 
operation;
             return (TableResultInternal)
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableASOperation.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CreateTableASOperation.java
similarity index 88%
rename from 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableASOperation.java
rename to 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CreateTableASOperation.java
index 53092497fc6..ba59dfe5465 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableASOperation.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CreateTableASOperation.java
@@ -16,14 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.operations.ddl;
+package org.apache.flink.table.operations;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.CatalogManager;
-import org.apache.flink.table.operations.Operation;
-import org.apache.flink.table.operations.OperationUtils;
-import org.apache.flink.table.operations.QueryOperation;
-import org.apache.flink.table.operations.SinkModifyOperation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -31,7 +28,7 @@ import java.util.Map;
 
 /** Operation to describe a CREATE TABLE AS statement. */
 @Internal
-public class CreateTableASOperation implements CreateOperation {
+public class CreateTableASOperation implements ModifyOperation {
 
     private final CreateTableOperation createTableOperation;
 
@@ -66,7 +63,7 @@ public class CreateTableASOperation implements 
CreateOperation {
     @Override
     public String asSummaryString() {
         Map<String, Object> params = new LinkedHashMap<>();
-        params.put("catalogTable", 
getCreateTableOperation().getCatalogTable().toProperties());
+        params.put("catalogTable", 
getCreateTableOperation().getCatalogTable());
         params.put("identifier", 
getCreateTableOperation().getTableIdentifier());
         params.put("ignoreIfExists", 
getCreateTableOperation().isIgnoreIfExists());
         params.put("isTemporary", getCreateTableOperation().isTemporary());
@@ -79,4 +76,14 @@ public class CreateTableASOperation implements 
CreateOperation {
                 Collections.singletonList(sinkModifyQuery),
                 Operation::asSummaryString);
     }
+
+    @Override
+    public QueryOperation getChild() {
+        return sinkModifyQuery;
+    }
+
+    @Override
+    public <T> T accept(ModifyOperationVisitor<T> visitor) {
+        return visitor.visit(this);
+    }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java
index 09c222ac529..98ab9d60f27 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java
@@ -35,4 +35,6 @@ public interface ModifyOperationVisitor<T> {
     <U> T visit(UnregisteredSinkModifyOperation<U> unregisteredSink);
 
     T visit(CollectModifyOperation selectOperation);
+
+    T visit(CreateTableASOperation ctas);
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
index 113d42e2e8a..a24fb689181 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java
@@ -34,8 +34,8 @@ import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ContextResolvedTable;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.CreateTableASOperation;
 import org.apache.flink.table.operations.Operation;
-import org.apache.flink.table.operations.ddl.CreateTableASOperation;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
 import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator;
 import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
index d2ae077e132..887d8a8d791 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java
@@ -99,6 +99,30 @@ public class CompiledPlanITCase extends JsonPlanTestBase {
         assertResult(DATA, sinkPath);
     }
 
+    @Test
+    public void testExecuteCtasPlanSql() throws Exception {
+        createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION);
+
+        File sinkPath = TEMPORARY_FOLDER.newFolder();
+        assertThatThrownBy(
+                        () ->
+                                tableEnv.compilePlanSql(
+                                                String.format(
+                                                        "CREATE TABLE sink\n"
+                                                                + "WITH (\n"
+                                                                + "  
'connector' = 'filesystem',\n"
+                                                                + "  'format' 
= 'testcsv',\n"
+                                                                + "  'path' = 
'%s'\n"
+                                                                + ") AS SELECT 
* FROM src",
+                                                        
sinkPath.getAbsolutePath()))
+                                        .execute())
+                .satisfies(
+                        anyCauseMatches(
+                                TableException.class,
+                                "Unsupported SQL query! compilePlanSql() only 
accepts a single SQL statement"
+                                        + " of type INSERT"));
+    }
+
     @Test
     public void testExecutePlanTable() throws Exception {
         File sinkPath = createSourceSinkTables();
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
index d22c62fe26a..2b3a0c7f1e7 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala
@@ -806,8 +806,8 @@ class TableSinkTest extends TableTestBase {
     Assertions
       .assertThatThrownBy(
         () => util.tableEnv.explainSql("CREATE TABLE zm_ctas_test AS SELECT * 
FROM MyTable"))
-      .hasMessage(
-        "Unsupported operation: 
org.apache.flink.table.operations.ddl.CreateTableASOperation")
+      .hasMessageContaining(
+        "Unsupported ModifyOperation: 
org.apache.flink.table.operations.CreateTableASOperation")
   }
 }
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
index 6ea965191b9..b4224519faa 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala
@@ -125,6 +125,22 @@ class TableSinkITCase extends BatchTestBase {
     val expected = Seq("1,1,Hi", "2,2,Hello", "3,2,Hello world")
     val result = TableTestUtil.readFromFile(resultPath)
     Assertions.assertThat(result.sorted).isEqualTo(expected.sorted)
+
+    // test statement set
+    val statementSet = tEnv.createStatementSet()
+    val useStatementResultPath = 
BatchAbstractTestBase.TEMPORARY_FOLDER.newFolder().getAbsolutePath
+    statementSet.addInsertSql(s"""
+                                 |CREATE TABLE MyCtasTableUseStatement
+                                 | WITH (
+                                 |  'connector' = 'filesystem',
+                                 |  'format' = 'testcsv',
+                                 |  'path' = '$useStatementResultPath'
+                                 |) AS
+                                 | SELECT * FROM MyTable
+                                 |""".stripMargin)
+    statementSet.execute().await()
+    val useStatementResult = TableTestUtil.readFromFile(useStatementResultPath)
+    Assertions.assertThat(useStatementResult.sorted).isEqualTo(expected.sorted)
   }
 
   @Test
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
index 5dc7bc70e53..9885bd6a62c 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
@@ -280,6 +280,23 @@ class TableSinkITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
       "+I[jason, 1]"
     )
     Assertions.assertThat(actual.sorted).isEqualTo(expected.sorted)
+    // test statement set
+    val statementSet = tEnv.createStatementSet()
+    statementSet.addInsertSql("""
+                                |CREATE TABLE MyCtasTableUseStatement
+                                | WITH (
+                                |   'connector' = 'values',
+                                |   'sink-insert-only' = 'true'
+                                |) AS
+                                |  SELECT
+                                |    `person`,
+                                |    `votes`
+                                |  FROM
+                                |    src
+                                |""".stripMargin)
+    statementSet.execute().await()
+    val actualUseStatement = 
TestValuesTableFactory.getResults("MyCtasTableUseStatement")
+    Assertions.assertThat(actualUseStatement.sorted).isEqualTo(expected.sorted)
   }
 
   @Test

Reply via email to