This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push: new 78bd8c8 [FLINK-18750][table] SqlValidatorException thrown when select from a view which contains a UDTF call 78bd8c8 is described below commit 78bd8c8c57be3dee2dff765cea6175a3c8dfc59c Author: yuzhao.cyz <yuzhao....@gmail.com> AuthorDate: Tue Aug 25 10:12:23 2020 +0800 [FLINK-18750][table] SqlValidatorException thrown when select from a view which contains a UDTF call --- .../operations/SqlToOperationConverter.java | 63 +++++--- .../apache/flink/table/planner/utils/Expander.java | 167 +++++++++++++++++++++ .../plan/rules/logical/WindowPropertiesRule.scala | 2 +- .../planner/plan/common/ViewsExpandingTest.xml | 44 ++++++ .../planner/plan/common/ViewsExpandingTest.scala | 72 ++++++++- 5 files changed, 321 insertions(+), 27 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java index fa04539..4563ef0 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java @@ -110,6 +110,7 @@ import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation; import org.apache.flink.table.operations.ddl.DropViewOperation; import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; import org.apache.flink.table.planner.hint.FlinkHints; +import org.apache.flink.table.planner.utils.Expander; import org.apache.flink.table.planner.utils.OperationConverterUtils; import org.apache.flink.table.types.DataType; import org.apache.flink.table.utils.TableSchemaUtils; @@ -132,6 +133,7 @@ import org.apache.calcite.sql.parser.SqlParser; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -286,15 +288,12 @@ public class SqlToOperationConverter { SqlAlterViewAs alterViewAs = (SqlAlterViewAs) alterView; final SqlNode newQuery = alterViewAs.getNewQuery(); - SqlNode validateQuery = flinkPlanner.validate(newQuery); - PlannerQueryOperation operation = toQueryOperation(flinkPlanner, validateQuery); - TableSchema schema = operation.getTableSchema(); - - String originalQuery = getQuotedSqlString(newQuery); - String expandedQuery = getQuotedSqlString(validateQuery); CatalogView oldView = (CatalogView) baseTable; - CatalogView newView = new CatalogViewImpl(originalQuery, expandedQuery, schema, - oldView.getOptions(), oldView.getComment()); + CatalogView newView = convertViewQuery( + newQuery, + Collections.emptyList(), + oldView.getOptions(), + oldView.getComment()); return new AlterViewAsOperation(viewIdentifier, newView); } else { throw new ValidationException( @@ -658,7 +657,27 @@ public class SqlToOperationConverter { final SqlNode query = sqlCreateView.getQuery(); final SqlNodeList fieldList = sqlCreateView.getFieldList(); - SqlNode validateQuery = flinkPlanner.validate(query); + UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlCreateView.fullViewName()); + ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); + + String comment = sqlCreateView.getComment() + .map(c -> c.getNlsString().getValue()).orElse(null); + CatalogView catalogView = convertViewQuery( + query, + fieldList.getList(), + OperationConverterUtils.extractProperties(sqlCreateView.getProperties() + .orElse(null)), + comment); + return new CreateViewOperation( + identifier, + catalogView, + sqlCreateView.isIfNotExists(), + sqlCreateView.isTemporary()); + } + + /** Convert the query part of a VIEW statement. */ + private CatalogView convertViewQuery(SqlNode query, List<SqlNode> fieldNames, + Map<String, String> props, String comment) { // Put the sql string unparse (getQuotedSqlString()) in front of // the node conversion (toQueryOperation()), // because before Calcite 1.22.0, during sql-to-rel conversion, the SqlWindow @@ -666,17 +685,23 @@ public class SqlToOperationConverter { // This bug is fixed in CALCITE-3877 of Calcite 1.23.0. String originalQuery = getQuotedSqlString(query); - String expandedQuery = getQuotedSqlString(validateQuery); + SqlNode validateQuery = flinkPlanner.validate(query); + // The LATERAL operator was eliminated during sql validation, thus the unparsed SQL + // does not contain LATERAL which is problematic, + // the issue was resolved in CALCITE-4077 + // (always treat the table function as implicitly LATERAL). + String expandedQuery = Expander.create(flinkPlanner) + .expanded(originalQuery).substitute(this::getQuotedSqlString); PlannerQueryOperation operation = toQueryOperation(flinkPlanner, validateQuery); TableSchema schema = operation.getTableSchema(); // the view column list in CREATE VIEW is optional, if it's not empty, we should update // the column name with the names in view column list. - if (!fieldList.getList().isEmpty()) { + if (!fieldNames.isEmpty()) { // alias column names: String[] inputFieldNames = schema.getFieldNames(); - String[] aliasFieldNames = fieldList.getList().stream() + String[] aliasFieldNames = fieldNames.stream() .map(SqlNode::toString) .toArray(String[]::new); @@ -690,21 +715,11 @@ public class SqlToOperationConverter { schema = TableSchema.builder().fields(aliasFieldNames, inputFieldTypes).build(); } - String comment = sqlCreateView.getComment().map(c -> c.getNlsString().getValue()).orElse(null); - CatalogView catalogView = new CatalogViewImpl(originalQuery, + return new CatalogViewImpl(originalQuery, expandedQuery, schema, - OperationConverterUtils.extractProperties(sqlCreateView.getProperties().orElse(null)), + props, comment); - - UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlCreateView.fullViewName()); - ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); - - return new CreateViewOperation( - identifier, - catalogView, - sqlCreateView.isIfNotExists(), - sqlCreateView.isTemporary()); } /** Convert DROP VIEW statement. */ diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/Expander.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/Expander.java new file mode 100644 index 0000000..8a3e403 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/Expander.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.utils; + +import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; +import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction; + +import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableMap; + +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlUnresolvedFunction; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.util.SqlBasicVisitor; +import org.apache.calcite.sql.util.SqlShuttle; +import org.apache.calcite.util.Util; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; + +/** + * Utility that expand SQL identifiers in a SQL query. + * + * <p>Simple use: + * + * <blockquote><code> + * final String sql =<br> + * "select ename from emp where deptno < 10";<br> + * final Expander.Expanded expanded =<br> + * Expander.create(planner).expanded(sql);<br> + * print(expanded); // "select `emp`.`ename` from `catalog`.`db`.`emp` where `emp`.`deptno` < 10" + * </code></blockquote> + * + * <p>Calling {@link Expanded#toString()} generates a string that is similar to + * SQL where a user has manually converted all identifiers as expanded, and + * which could then be persisted as expanded query of a Catalog view. + * + * <p>For more advanced formatting, use {@link Expanded#substitute(Function)}. + * + * <p>Adjust {@link SqlParser.Config} to use a different parser or parsing options. + */ +public class Expander { + private final FlinkPlannerImpl planner; + + private Expander(FlinkPlannerImpl planner) { + this.planner = Objects.requireNonNull(planner); + } + + /** Creates an Expander. **/ + public static Expander create(FlinkPlannerImpl planner) { + return new Expander(planner); + } + + /** Expands identifiers in a given SQL string, returning a {@link Expanded}. */ + public Expanded expanded(String ori) { + final Map<SqlParserPos, SqlIdentifier> identifiers = new HashMap<>(); + final Map<String, SqlIdentifier> funcNameToId = new HashMap<>(); + final SqlNode oriNode = planner.parser().parse(ori); + // parse again because validation is stateful, that means the node tree was probably + // mutated. + final SqlNode validated = planner.validate(planner.parser().parse(ori)); + validated.accept(new SqlBasicVisitor<Void>() { + @Override public Void visit(SqlCall call) { + SqlOperator operator = call.getOperator(); + if (operator instanceof BridgingSqlFunction) { + final SqlIdentifier functionID = ((BridgingSqlFunction) operator) + .getSqlIdentifier(); + if (!functionID.isSimple()) { + funcNameToId.put(Util.last(functionID.names), functionID); + } + } + return super.visit(call); + } + + @Override public Void visit(SqlIdentifier identifier) { + identifiers.putIfAbsent(identifier.getParserPosition(), identifier); + return null; + } + }); + return new Expanded(oriNode, identifiers, funcNameToId); + } + + /** Result of expanding. */ + public static class Expanded { + private final SqlNode oriNode; + private final Map<SqlParserPos, SqlIdentifier> identifiersMap; + private final Map<String, SqlIdentifier> funcNameToId; + + Expanded(SqlNode oriNode, Map<SqlParserPos, SqlIdentifier> identifiers, + Map<String, SqlIdentifier> funcNameToId) { + this.oriNode = oriNode; + this.identifiersMap = ImmutableMap.copyOf(identifiers); + this.funcNameToId = ImmutableMap.copyOf(funcNameToId); + } + + @Override + public String toString() { + return substitute(SqlNode::toString); + } + + /** Returns the SQL string with identifiers replaced according to the + * given unparse function. */ + public String substitute(Function<SqlNode, String> fn) { + final SqlShuttle shuttle = new SqlShuttle() { + @Override + public SqlNode visit(SqlCall call) { + SqlOperator operator = call.getOperator(); + if (operator instanceof SqlUnresolvedFunction) { + final SqlUnresolvedFunction unresolvedFunction = + (SqlUnresolvedFunction) operator; + final SqlIdentifier functionID = unresolvedFunction.getSqlIdentifier(); + if (functionID.isSimple() + && funcNameToId.containsKey(functionID.getSimple())) { + SqlUnresolvedFunction newFunc = new SqlUnresolvedFunction( + funcNameToId.get(functionID.getSimple()), + unresolvedFunction.getReturnTypeInference(), + unresolvedFunction.getOperandTypeInference(), + unresolvedFunction.getOperandTypeChecker(), + unresolvedFunction.getParamTypes(), + unresolvedFunction.getFunctionType()); + return newFunc.createCall( + call.getFunctionQuantifier(), + call.getParserPosition(), + call.getOperandList().toArray(new SqlNode[0])); + } + } + return super.visit(call); + } + + @Override + public SqlNode visit(SqlIdentifier id) { + if (id.isStar()) { + return id; + } + final SqlIdentifier toReplace = identifiersMap.get(id.getParserPosition()); + if (toReplace == null || id.names.size() >= toReplace.names.size()) { + return id; + } + return toReplace; + } + }; + final SqlNode substituted = this.oriNode.accept(shuttle); + return fn.apply(substituted); + } + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowPropertiesRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowPropertiesRule.scala index 2d0b5c0..c0c92df8 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowPropertiesRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/WindowPropertiesRule.scala @@ -45,7 +45,7 @@ class WindowPropertiesRule extends RelOptRule( override def matches(call: RelOptRuleCall): Boolean = { val project: LogicalProject = call.rel(0) - // project includes at least on group auxiliary function + // project includes at least one group auxiliary function project.getProjects.exists(WindowPropertiesRules.hasGroupAuxiliaries) } diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.xml index cbe66fd..ce101a7 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.xml @@ -161,4 +161,48 @@ Calc(select=[CAST(a) AS a, b, CAST(EXPR$2) AS c]) ]]> </Resource> </TestCase> + <TestCase name="testViewExpandingWithLateralTableFunction[0]"> + <Resource name="sql"> + <![CDATA[select * from tmp_view]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(f0=[$0], f1=[$1]) ++- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{0}]) + :- LogicalProject(f0=[AS($0, _UTF-16LE'f0')]) + : +- LogicalValues(tuples=[[{ _UTF-16LE'danny#21' }, { _UTF-16LE'julian#55' }, { _UTF-16LE'fabian#30' }]]) + +- LogicalTableFunctionScan(invocation=[myFunc($cor1.f0)], rowType=[*org.apache.flink.table.planner.utils.SimpleUser*]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[f0, name AS f1]) ++- Correlate(invocation=[myFunc($cor1.f0)], correlate=[table(myFunc($cor1.f0))], select=[f0,name,age], rowType=[RecordType(VARCHAR(9) f0, VARCHAR(2147483647) name, INTEGER age)], joinType=[INNER]) + +- Calc(select=[f0]) + +- Values(tuples=[[{ _UTF-16LE'danny#21' }, { _UTF-16LE'julian#55' }, { _UTF-16LE'fabian#30' }]], values=[f0]) +]]> + </Resource> + </TestCase> + <TestCase name="testViewExpandingWithLateralTableFunction[1]"> + <Resource name="sql"> + <![CDATA[select * from tmp_view]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(f0=[$0], f1=[$1]) ++- LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{0}]) + :- LogicalProject(f0=[AS($0, _UTF-16LE'f0')]) + : +- LogicalValues(tuples=[[{ _UTF-16LE'danny#21' }, { _UTF-16LE'julian#55' }, { _UTF-16LE'fabian#30' }]]) + +- LogicalTableFunctionScan(invocation=[myFunc($cor1.f0)], rowType=[*org.apache.flink.table.planner.utils.SimpleUser*]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[f0, name AS f1]) ++- Correlate(invocation=[myFunc($cor1.f0)], correlate=[table(myFunc($cor1.f0))], select=[f0,name,age], rowType=[RecordType(VARCHAR(9) f0, VARCHAR(2147483647) name, INTEGER age)], joinType=[INNER]) + +- Calc(select=[f0]) + +- Values(tuples=[[{ _UTF-16LE'danny#21' }, { _UTF-16LE'julian#55' }, { _UTF-16LE'fabian#30' }]]) +]]> + </Resource> + </TestCase> </Root> diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala index 5bdb6b5..666de8d 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/common/ViewsExpandingTest.scala @@ -20,9 +20,13 @@ package org.apache.flink.table.planner.plan.common import org.apache.flink.api.scala._ import org.apache.flink.table.api._ -import org.apache.flink.table.catalog.{CatalogView, CatalogViewImpl, ObjectPath} -import org.apache.flink.table.planner.utils.{TableTestBase, TableTestUtil, TableTestUtilBase} +import org.apache.flink.table.catalog.{CatalogBaseTable, CatalogView, CatalogViewImpl, ObjectIdentifier, ObjectPath} +import org.apache.flink.table.functions.ScalarFunction +import org.apache.flink.table.planner.plan.common.ViewsExpandingTest.PrimitiveScalarFunction +import org.apache.flink.table.planner.utils.{TableFunc0, TableTestBase, TableTestUtil, TableTestUtilBase} +import org.hamcrest.CoreMatchers.is +import org.junit.Assert.assertThat import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -114,6 +118,60 @@ class ViewsExpandingTest(tableTestUtil: TableTestBase => TableTestUtil) extends tableUtil.verifyPlan("select * from view1") } + @Test + def testViewExpandingWithLateralTableFunction(): Unit = { + val tableUtil = tableTestUtil(this) + val tableEnv = tableUtil.tableEnv + tableEnv.createTemporarySystemFunction("myFunc", new TableFunc0()) + tableEnv.createTemporaryView("source", + tableEnv.fromValues("danny#21", "julian#55", "fabian#30").as("f0")) + val createView = + """ + |CREATE VIEW tmp_view AS + | SELECT f0, f1 + | FROM source as S, LATERAL TABLE(myFunc(f0)) as T(f1, f2) + |""".stripMargin + tableEnv.executeSql(createView) + tableUtil.verifyPlan("select * from tmp_view") + } + + @Test + def testViewExpandingWithBuiltinFunction(): Unit = { + val tableUtil = tableTestUtil(this) + val tableEnv = tableUtil.tableEnv + val createView = + """ + |CREATE VIEW tmp_view AS + | SELECT CONCAT('a', 'bc', 'def') + |""".stripMargin + tableEnv.executeSql(createView) + val objectID = ObjectIdentifier.of(tableEnv.getCurrentCatalog, + tableEnv.getCurrentDatabase, "tmp_view") + val view: CatalogBaseTable = tableEnv.getCatalog(objectID.getCatalogName) + .get().getTable(objectID.toObjectPath) + assertThat(view.asInstanceOf[CatalogView].getExpandedQuery, + is("SELECT `CONCAT`('a', 'bc', 'def')")) + } + + @Test + def testViewExpandingWithUDF(): Unit = { + val tableUtil = tableTestUtil(this) + val tableEnv = tableUtil.tableEnv + tableEnv.createTemporaryFunction("func", classOf[PrimitiveScalarFunction]) + val createView = + """ + |CREATE VIEW tmp_view AS + | SELECT func(1, 2, 'abc') + |""".stripMargin + tableEnv.executeSql(createView) + val objectID = ObjectIdentifier.of(tableEnv.getCurrentCatalog, + tableEnv.getCurrentDatabase, "tmp_view") + val view: CatalogBaseTable = tableEnv.getCatalog(objectID.getCatalogName) + .get().getTable(objectID.toObjectPath) + assertThat(view.asInstanceOf[CatalogView].getExpandedQuery, + is("SELECT `default_catalog`.`default_database`.`func`(1, 2, 'abc')")) + } + private def createSqlView(originTable: String): CatalogView = { new CatalogViewImpl( s"select * as c from $originTable", @@ -137,4 +195,14 @@ object ViewsExpandingTest { _.batchTestUtil(), _.streamTestUtil()) } + + // -------------------------------------------------------------------------------------------- + // Test functions + // -------------------------------------------------------------------------------------------- + /** + * Function that takes and returns primitives. + */ + class PrimitiveScalarFunction extends ScalarFunction { + def eval(i: Int, l: Long, s: String): Long = i + l + s.length + } }