This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 44009efc540 [FLINK-29219][table] Fix CREATE TABLE AS statement blocks SQL client's execution 44009efc540 is described below commit 44009efc5400c6706563fa53335eccf445cf5d0c Author: Ron <ldliu...@163.com> AuthorDate: Tue Sep 27 09:52:14 2022 +0800 [FLINK-29219][table] Fix CREATE TABLE AS statement blocks SQL client's execution This closes #20869 --- .../hive/parse/HiveParserDDLSemanticAnalyzer.java | 2 +- .../table/sql/codegen/CreateTableAsITCase.java | 68 ++++++++++++++++++++++ .../src/test/resources/create_table_as_e2e.sql | 36 ++++++++++++ .../resources/create_table_as_statementset_e2e.sql | 40 +++++++++++++ .../apache/flink/table/client/cli/CliClient.java | 9 ++- .../flink-sql-client/src/test/resources/sql/set.q | 21 ++++--- .../gateway/AbstractSqlGatewayStatementITCase.java | 3 + .../src/test/resources/sql/insert.q | 53 +++++++++++++++++ .../table/api/internal/TableEnvironmentImpl.java | 26 ++++++--- .../{ddl => }/CreateTableASOperation.java | 21 ++++--- .../table/operations/ModifyOperationVisitor.java | 2 + .../operations/SqlCreateTableConverter.java | 2 +- .../apache/flink/table/api/CompiledPlanITCase.java | 24 ++++++++ .../planner/plan/stream/sql/TableSinkTest.scala | 4 +- .../runtime/batch/sql/TableSinkITCase.scala | 16 +++++ .../runtime/stream/sql/TableSinkITCase.scala | 17 ++++++ 16 files changed, 316 insertions(+), 28 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java index 3ef4f1ba879..799cd09ad6d 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java @@ -60,6 +60,7 @@ import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.functions.FunctionDefinition; import org.apache.flink.table.functions.hive.HiveFunctionWrapper; import org.apache.flink.table.functions.hive.HiveGenericUDF; +import org.apache.flink.table.operations.CreateTableASOperation; import org.apache.flink.table.operations.DescribeTableOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.QueryOperation; @@ -80,7 +81,6 @@ import org.apache.flink.table.operations.ddl.AlterViewPropertiesOperation; import org.apache.flink.table.operations.ddl.AlterViewRenameOperation; import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation; import org.apache.flink.table.operations.ddl.CreateDatabaseOperation; -import org.apache.flink.table.operations.ddl.CreateTableASOperation; import org.apache.flink.table.operations.ddl.CreateTableOperation; import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation; import org.apache.flink.table.operations.ddl.CreateViewOperation; diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/CreateTableAsITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/CreateTableAsITCase.java new file mode 100644 index 00000000000..1204842547b --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/codegen/CreateTableAsITCase.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sql.codegen; + +import org.apache.flink.test.util.SQLJobSubmission; +import org.apache.flink.tests.util.flink.ClusterController; + +import org.junit.Test; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; + +/** End to End tests for create table as select syntax. */ +public class CreateTableAsITCase extends SqlITCaseBase { + + public CreateTableAsITCase(String executionMode) { + super(executionMode); + } + + @Test + public void testCreateTableAs() throws Exception { + runAndCheckSQL( + "create_table_as_e2e.sql", + generateReplaceVars(), + 2, + Arrays.asList( + "{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}", + "{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}")); + } + + @Test + public void testCreateTableAsInStatementSet() throws Exception { + runAndCheckSQL( + "create_table_as_statementset_e2e.sql", + generateReplaceVars(), + 2, + Arrays.asList( + "{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}", + "{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}")); + } + + @Override + protected void executeSqlStatements(ClusterController clusterController, List<String> sqlLines) + throws Exception { + clusterController.submitSQLJob( + new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines) + .addJar(SQL_TOOL_BOX_JAR) + .build(), + Duration.ofMinutes(2L)); + } +} diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/create_table_as_e2e.sql b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/create_table_as_e2e.sql new file mode 100644 index 00000000000..b9316e1096a --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/create_table_as_e2e.sql @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TEMPORARY FUNCTION count_agg AS 'org.apache.flink.table.toolbox.CountAggFunction'; + +SET execution.runtime-mode = $MODE; +SET table.exec.mini-batch.enabled = true; +SET table.exec.mini-batch.size = 5; +SET table.exec.mini-batch.allow-latency = 2s; + +CREATE TABLE JsonTable +WITH ( + 'connector' = 'filesystem', + 'path' = '$RESULT', + 'sink.rolling-policy.rollover-interval' = '2s', + 'sink.rolling-policy.check-interval' = '2s', + 'format' = 'debezium-json' +) +AS SELECT user_name, count_agg(order_id) AS order_cnt +FROM (VALUES (1, 'Bob'), (2, 'Bob'), (1, 'Alice')) T(order_id, user_name) +GROUP BY user_name; diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/create_table_as_statementset_e2e.sql b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/create_table_as_statementset_e2e.sql new file mode 100644 index 00000000000..deedf8a0eb3 --- /dev/null +++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/resources/create_table_as_statementset_e2e.sql @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TEMPORARY FUNCTION count_agg AS 'org.apache.flink.table.toolbox.CountAggFunction'; + +SET execution.runtime-mode = $MODE; +SET table.exec.mini-batch.enabled = true; +SET table.exec.mini-batch.size = 5; +SET table.exec.mini-batch.allow-latency = 2s; + +BEGIN STATEMENT SET; + +CREATE TABLE JsonTable +WITH ( + 'connector' = 'filesystem', + 'path' = '$RESULT', + 'sink.rolling-policy.rollover-interval' = '2s', + 'sink.rolling-policy.check-interval' = '2s', + 'format' = 'debezium-json' +) +AS SELECT user_name, count_agg(order_id) AS order_cnt + FROM (VALUES (1, 'Bob'), (2, 'Bob'), (1, 'Alice')) T(order_id, user_name) + GROUP BY user_name; + +END; diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java index 1bc2b84caf0..f6ade74ae70 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java @@ -28,6 +28,7 @@ import org.apache.flink.table.client.gateway.Executor; import org.apache.flink.table.client.gateway.ResultDescriptor; import org.apache.flink.table.client.gateway.SqlExecutionException; import org.apache.flink.table.operations.BeginStatementSetOperation; +import org.apache.flink.table.operations.CreateTableASOperation; import org.apache.flink.table.operations.EndStatementSetOperation; import org.apache.flink.table.operations.ExplainOperation; import org.apache.flink.table.operations.LoadModuleOperation; @@ -410,7 +411,8 @@ public class CliClient implements AutoCloseable { // check the current operation is allowed in STATEMENT SET. if (isStatementSetMode) { if (!(operation instanceof SinkModifyOperation - || operation instanceof EndStatementSetOperation)) { + || operation instanceof EndStatementSetOperation + || operation instanceof CreateTableASOperation)) { // It's up to invoker of the executeStatement to determine whether to continue // execution throw new SqlExecutionException(MESSAGE_STATEMENT_SET_SQL_EXECUTION_ERROR); @@ -463,6 +465,9 @@ public class CliClient implements AutoCloseable { } else if (operation instanceof ShowCreateViewOperation) { // SHOW CREATE VIEW callShowCreateView((ShowCreateViewOperation) operation); + } else if (operation instanceof CreateTableASOperation) { + // CTAS + callInsert((CreateTableASOperation) operation); } else { // fallback to default implementation executeOperation(operation); @@ -557,7 +562,7 @@ public class CliClient implements AutoCloseable { } } - private void callInsert(SinkModifyOperation operation) { + private void callInsert(ModifyOperation operation) { if (isStatementSetMode) { statementSetOperations.add(operation); printInfo(CliStrings.MESSAGE_ADD_STATEMENT_TO_STATEMENT_SET); diff --git a/flink-table/flink-sql-client/src/test/resources/sql/set.q b/flink-table/flink-sql-client/src/test/resources/sql/set.q index a450c4e9b42..58172fe4912 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/set.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/set.q @@ -74,14 +74,21 @@ CREATE TABLE hive_table ( [INFO] Execute statement succeed. !info -# test "ctas" only supported in Hive Dialect +# test "ctas" in Hive Dialect CREATE TABLE foo as select 1; -+-------------------------+ -| hivecatalog.default.foo | -+-------------------------+ -| -1 | -+-------------------------+ -1 row in set +[INFO] Submitting SQL update statement to the cluster... +[INFO] SQL update statement has been successfully submitted to the cluster: +Job ID: + +!info + +SELECT * from foo; ++----+-------------+ +| op | _o__c0 | ++----+-------------+ +| +I | 1 | ++----+-------------+ +Received a total of 1 row !ok # test add jar diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java index 682ff67bc1a..168f1e8674f 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java @@ -103,6 +103,9 @@ public abstract class AbstractSqlGatewayStatementITCase extends AbstractTestBase replaceVars.put( "$VAR_BATCH_PATH", Files.createDirectory(temporaryFolder.resolve("batch")).toFile().getPath()); + replaceVars.put( + "$VAR_BATCH_CTAS_PATH", + Files.createDirectory(temporaryFolder.resolve("batch_ctas")).toFile().getPath()); } @ParameterizedTest diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q b/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q index 2348b1f57a0..295b822cff0 100644 --- a/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q +++ b/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q @@ -180,3 +180,56 @@ SELECT * FROM BatchTable; +----+-------------+ 7 rows in set !ok + +# test only to verify the test job id. +SET '$internal.pipeline.job-id' = '84c7408a08c284d5736e50d3f5a648be'; +!output ++--------+ +| result | ++--------+ +| OK | ++--------+ +1 row in set +!ok + +CREATE TABLE CtasTable +WITH ( + 'connector' = 'filesystem', + 'path' = '$VAR_BATCH_CTAS_PATH', + 'format' = 'csv' +) +AS SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE')) T(id, str); +!output ++----------------------------------+ +| job id | ++----------------------------------+ +| 84c7408a08c284d5736e50d3f5a648be | ++----------------------------------+ +1 row in set +!ok + +RESET '$internal.pipeline.job-id'; +!output ++--------+ +| result | ++--------+ +| OK | ++--------+ +1 row in set +!ok + +SELECT * FROM CtasTable; +!output ++----+-------------+ +| id | str | ++----+-------------+ +| 1 | Hello World | +| 2 | Hi | +| 2 | Hi | +| 3 | Hello | +| 3 | World | +| 4 | ADD | +| 5 | LINE | ++----+-------------+ +7 rows in set +!ok diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index c61cdb05fd6..caf1175c680 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -90,6 +90,7 @@ import org.apache.flink.table.module.ModuleEntry; import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.operations.CollectModifyOperation; import org.apache.flink.table.operations.CompileAndExecutePlanOperation; +import org.apache.flink.table.operations.CreateTableASOperation; import org.apache.flink.table.operations.DescribeTableOperation; import org.apache.flink.table.operations.ExplainOperation; import org.apache.flink.table.operations.LoadModuleOperation; @@ -139,7 +140,6 @@ import org.apache.flink.table.operations.ddl.CompilePlanOperation; import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation; import org.apache.flink.table.operations.ddl.CreateCatalogOperation; import org.apache.flink.table.operations.ddl.CreateDatabaseOperation; -import org.apache.flink.table.operations.ddl.CreateTableASOperation; import org.apache.flink.table.operations.ddl.CreateTableOperation; import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation; import org.apache.flink.table.operations.ddl.CreateViewOperation; @@ -779,7 +779,9 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { public CompiledPlan compilePlanSql(String stmt) { List<Operation> operations = getParser().parse(stmt); - if (operations.size() != 1 || !(operations.get(0) instanceof ModifyOperation)) { + if (operations.size() != 1 + || !(operations.get(0) instanceof ModifyOperation) + || operations.get(0) instanceof CreateTableASOperation) { throw new TableException(UNSUPPORTED_QUERY_IN_COMPILE_PLAN_SQL_MSG); } @@ -839,8 +841,20 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { @Override public TableResultInternal executeInternal(List<ModifyOperation> operations) { - List<Transformation<?>> transformations = translate(operations); - List<String> sinkIdentifierNames = extractSinkIdentifierNames(operations); + List<ModifyOperation> mapOperations = new ArrayList<>(); + for (ModifyOperation modify : operations) { + // execute CREATE TABLE first for CTAS statements + if (modify instanceof CreateTableASOperation) { + CreateTableASOperation ctasOperation = (CreateTableASOperation) modify; + executeInternal(ctasOperation.getCreateTableOperation()); + mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager)); + } else { + mapOperations.add(modify); + } + } + + List<Transformation<?>> transformations = translate(mapOperations); + List<String> sinkIdentifierNames = extractSinkIdentifierNames(mapOperations); TableResultInternal result = executeInternal(transformations, sinkIdentifierNames); if (tableConfig.get(TABLE_DML_SYNC)) { try { @@ -1397,10 +1411,6 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { } } else if (operation instanceof QueryOperation) { return executeQueryOperation((QueryOperation) operation); - } else if (operation instanceof CreateTableASOperation) { - CreateTableASOperation createTableASOperation = (CreateTableASOperation) operation; - executeInternal(createTableASOperation.getCreateTableOperation()); - return executeInternal(createTableASOperation.toSinkModifyOperation(catalogManager)); } else if (operation instanceof ExecutePlanOperation) { ExecutePlanOperation executePlanOperation = (ExecutePlanOperation) operation; return (TableResultInternal) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableASOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CreateTableASOperation.java similarity index 88% rename from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableASOperation.java rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CreateTableASOperation.java index 53092497fc6..ba59dfe5465 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableASOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CreateTableASOperation.java @@ -16,14 +16,11 @@ * limitations under the License. */ -package org.apache.flink.table.operations.ddl; +package org.apache.flink.table.operations; import org.apache.flink.annotation.Internal; import org.apache.flink.table.catalog.CatalogManager; -import org.apache.flink.table.operations.Operation; -import org.apache.flink.table.operations.OperationUtils; -import org.apache.flink.table.operations.QueryOperation; -import org.apache.flink.table.operations.SinkModifyOperation; +import org.apache.flink.table.operations.ddl.CreateTableOperation; import java.util.Collections; import java.util.LinkedHashMap; @@ -31,7 +28,7 @@ import java.util.Map; /** Operation to describe a CREATE TABLE AS statement. */ @Internal -public class CreateTableASOperation implements CreateOperation { +public class CreateTableASOperation implements ModifyOperation { private final CreateTableOperation createTableOperation; @@ -66,7 +63,7 @@ public class CreateTableASOperation implements CreateOperation { @Override public String asSummaryString() { Map<String, Object> params = new LinkedHashMap<>(); - params.put("catalogTable", getCreateTableOperation().getCatalogTable().toProperties()); + params.put("catalogTable", getCreateTableOperation().getCatalogTable()); params.put("identifier", getCreateTableOperation().getTableIdentifier()); params.put("ignoreIfExists", getCreateTableOperation().isIgnoreIfExists()); params.put("isTemporary", getCreateTableOperation().isTemporary()); @@ -79,4 +76,14 @@ public class CreateTableASOperation implements CreateOperation { Collections.singletonList(sinkModifyQuery), Operation::asSummaryString); } + + @Override + public QueryOperation getChild() { + return sinkModifyQuery; + } + + @Override + public <T> T accept(ModifyOperationVisitor<T> visitor) { + return visitor.visit(this); + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java index 09c222ac529..98ab9d60f27 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java @@ -35,4 +35,6 @@ public interface ModifyOperationVisitor<T> { <U> T visit(UnregisteredSinkModifyOperation<U> unregisteredSink); T visit(CollectModifyOperation selectOperation); + + T visit(CreateTableASOperation ctas); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java index 113d42e2e8a..a24fb689181 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java @@ -34,8 +34,8 @@ import org.apache.flink.table.catalog.CatalogTableImpl; import org.apache.flink.table.catalog.ContextResolvedTable; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.CreateTableASOperation; import org.apache.flink.table.operations.Operation; -import org.apache.flink.table.operations.ddl.CreateTableASOperation; import org.apache.flink.table.operations.ddl.CreateTableOperation; import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator; import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java index d2ae077e132..887d8a8d791 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java @@ -99,6 +99,30 @@ public class CompiledPlanITCase extends JsonPlanTestBase { assertResult(DATA, sinkPath); } + @Test + public void testExecuteCtasPlanSql() throws Exception { + createTestCsvSourceTable("src", DATA, COLUMNS_DEFINITION); + + File sinkPath = TEMPORARY_FOLDER.newFolder(); + assertThatThrownBy( + () -> + tableEnv.compilePlanSql( + String.format( + "CREATE TABLE sink\n" + + "WITH (\n" + + " 'connector' = 'filesystem',\n" + + " 'format' = 'testcsv',\n" + + " 'path' = '%s'\n" + + ") AS SELECT * FROM src", + sinkPath.getAbsolutePath())) + .execute()) + .satisfies( + anyCauseMatches( + TableException.class, + "Unsupported SQL query! compilePlanSql() only accepts a single SQL statement" + + " of type INSERT")); + } + @Test public void testExecutePlanTable() throws Exception { File sinkPath = createSourceSinkTables(); diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala index d22c62fe26a..2b3a0c7f1e7 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableSinkTest.scala @@ -806,8 +806,8 @@ class TableSinkTest extends TableTestBase { Assertions .assertThatThrownBy( () => util.tableEnv.explainSql("CREATE TABLE zm_ctas_test AS SELECT * FROM MyTable")) - .hasMessage( - "Unsupported operation: org.apache.flink.table.operations.ddl.CreateTableASOperation") + .hasMessageContaining( + "Unsupported ModifyOperation: org.apache.flink.table.operations.CreateTableASOperation") } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala index 6ea965191b9..b4224519faa 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSinkITCase.scala @@ -125,6 +125,22 @@ class TableSinkITCase extends BatchTestBase { val expected = Seq("1,1,Hi", "2,2,Hello", "3,2,Hello world") val result = TableTestUtil.readFromFile(resultPath) Assertions.assertThat(result.sorted).isEqualTo(expected.sorted) + + // test statement set + val statementSet = tEnv.createStatementSet() + val useStatementResultPath = BatchAbstractTestBase.TEMPORARY_FOLDER.newFolder().getAbsolutePath + statementSet.addInsertSql(s""" + |CREATE TABLE MyCtasTableUseStatement + | WITH ( + | 'connector' = 'filesystem', + | 'format' = 'testcsv', + | 'path' = '$useStatementResultPath' + |) AS + | SELECT * FROM MyTable + |""".stripMargin) + statementSet.execute().await() + val useStatementResult = TableTestUtil.readFromFile(useStatementResultPath) + Assertions.assertThat(useStatementResult.sorted).isEqualTo(expected.sorted) } @Test diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala index 5dc7bc70e53..9885bd6a62c 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala @@ -280,6 +280,23 @@ class TableSinkITCase(mode: StateBackendMode) extends StreamingWithStateTestBase "+I[jason, 1]" ) Assertions.assertThat(actual.sorted).isEqualTo(expected.sorted) + // test statement set + val statementSet = tEnv.createStatementSet() + statementSet.addInsertSql(""" + |CREATE TABLE MyCtasTableUseStatement + | WITH ( + | 'connector' = 'values', + | 'sink-insert-only' = 'true' + |) AS + | SELECT + | `person`, + | `votes` + | FROM + | src + |""".stripMargin) + statementSet.execute().await() + val actualUseStatement = TestValuesTableFactory.getResults("MyCtasTableUseStatement") + Assertions.assertThat(actualUseStatement.sorted).isEqualTo(expected.sorted) } @Test