This is an automated email from the ASF dual-hosted git repository. jchan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new db934bfe377 [FLINK-20539][table-planner] Fix type mismatch when using ROW in computed column db934bfe377 is described below commit db934bfe3772cc44263d42e875a5957ea00c15c1 Author: Xuyang <xyzhong...@163.com> AuthorDate: Wed Nov 22 15:53:22 2023 +0800 [FLINK-20539][table-planner] Fix type mismatch when using ROW in computed column This closes #23519 --- .../org/apache/calcite/sql/fun/SqlRowOperator.java | 46 ++++++++++++++++------ .../planner/plan/stream/sql/TableScanTest.xml | 18 +++++++++ .../planner/plan/stream/sql/TableScanTest.scala | 14 +++++++ 3 files changed, 66 insertions(+), 12 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlRowOperator.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlRowOperator.java index d861f2af623..e2ab560fd06 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlRowOperator.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlRowOperator.java @@ -19,6 +19,7 @@ package org.apache.calcite.sql.fun; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.StructKind; import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlOperatorBinding; @@ -27,14 +28,17 @@ import org.apache.calcite.sql.SqlUtil; import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.type.InferTypes; import org.apache.calcite.sql.type.OperandTypes; -import org.apache.calcite.util.Pair; import java.util.AbstractList; -import java.util.Map; /** - * Copied to keep null semantics of table api and sql in sync. At the same time SQL standard says - * that the next about `ROW`: + * Copied to keep null semantics of table api and sql in sync. + * + * <p>There are differences following: + * + * <p>1. The return value about {@code R IS NULL} and {@code R IS NOT NULL}. + * + * <p>At the same time SQL standard says that the next about `ROW`: * * <ul> * <li>The value of {@code R IS NULL} is: @@ -66,12 +70,19 @@ import java.util.Map; * </ul> * </ul> * - * Once Flink applies same logic for both table api and sql, this class should be removed. + * <p>Once Flink applies same logic for both table api and sql, this first changes should be + * removed. + * + * <p>2. It uses {@link StructKind#PEEK_FIELDS_NO_EXPAND} with a nested struct type (Flink [[{@link + * org.apache.flink.table.types.logical.RowType}]]). + * + * <p>See more at {@link org.apache.flink.table.planner.typeutils.LogicalRelDataTypeConverter} and + * {@link org.apache.flink.table.planner.calcite.FlinkTypeFactory}. * * <p>Changed lines * * <ol> - * <li>Line 92 ~ 112 + * <li>Line 106 ~ 137 * </ol> */ public class SqlRowOperator extends SqlSpecialOperator { @@ -96,20 +107,31 @@ public class SqlRowOperator extends SqlSpecialOperator { // The type of a ROW(e1,e2) expression is a record with the types // {e1type,e2type}. According to the standard, field names are // implementation-defined. + int fieldCount = opBinding.getOperandCount(); return opBinding .getTypeFactory() .createStructType( - new AbstractList<Map.Entry<String, RelDataType>>() { + StructKind.PEEK_FIELDS_NO_EXPAND, + new AbstractList<RelDataType>() { + @Override + public RelDataType get(int index) { + return opBinding.getOperandType(index); + } + + @Override + public int size() { + return fieldCount; + } + }, + new AbstractList<String>() { @Override - public Map.Entry<String, RelDataType> get(int index) { - return Pair.of( - SqlUtil.deriveAliasFromOrdinal(index), - opBinding.getOperandType(index)); + public String get(int index) { + return SqlUtil.deriveAliasFromOrdinal(index); } @Override public int size() { - return opBinding.getOperandCount(); + return fieldCount; } }); // ----- FLINK MODIFICATION END ----- diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml index 71d028cd89a..52d21087262 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml @@ -233,6 +233,24 @@ LogicalProject(timestamp=[$0], metadata_timestamp=[$1], other=[$2], computed_oth <![CDATA[ Calc(select=[timestamp, metadata_timestamp, other, UPPER(other) AS computed_other, CAST(metadata_timestamp AS VARCHAR(2147483647)) AS computed_timestamp]) +- TableSourceScan(table=[[default_catalog, default_database, MetadataTable, metadata=[other, timestamp]]], fields=[timestamp, other, metadata_timestamp]) +]]> + </Resource> + </TestCase> + <TestCase name="testDDLWithRowTypeComputedColumn"> + <Resource name="sql"> + <![CDATA[SELECT * FROM t1]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2]) ++- LogicalProject(a=[$0], b=[$1], c=[ROW($0, $1)]) + +- LogicalTableScan(table=[[default_catalog, default_database, t1]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Calc(select=[a, b, ROW(a, b) AS c]) ++- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, b]) ]]> </Resource> </TestCase> diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala index 2cd7f047e98..63f69626b3d 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala @@ -78,6 +78,20 @@ class TableScanTest extends TableTestBase { util.verifyExecPlan("SELECT * FROM t1") } + @Test + def testDDLWithRowTypeComputedColumn(): Unit = { + util.addTable(s""" + |create table t1( + | a int, + | b varchar, + | c as row(a, b) + |) with ( + | 'connector' = 'values' + |) + """.stripMargin) + util.verifyExecPlan("SELECT * FROM t1") + } + @Test def testDDLWithMetadataColumn(): Unit = { // tests reordering, skipping, and casting of metadata