This is an automated email from the ASF dual-hosted git repository.

jchan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new db934bfe377 [FLINK-20539][table-planner] Fix type mismatch when using 
ROW in computed column
db934bfe377 is described below

commit db934bfe3772cc44263d42e875a5957ea00c15c1
Author: Xuyang <xyzhong...@163.com>
AuthorDate: Wed Nov 22 15:53:22 2023 +0800

    [FLINK-20539][table-planner] Fix type mismatch when using ROW in computed 
column
    
    This closes #23519
---
 .../org/apache/calcite/sql/fun/SqlRowOperator.java | 46 ++++++++++++++++------
 .../planner/plan/stream/sql/TableScanTest.xml      | 18 +++++++++
 .../planner/plan/stream/sql/TableScanTest.scala    | 14 +++++++
 3 files changed, 66 insertions(+), 12 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlRowOperator.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlRowOperator.java
index d861f2af623..e2ab560fd06 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlRowOperator.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/fun/SqlRowOperator.java
@@ -19,6 +19,7 @@
 package org.apache.calcite.sql.fun;
 
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.StructKind;
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlOperatorBinding;
@@ -27,14 +28,17 @@ import org.apache.calcite.sql.SqlUtil;
 import org.apache.calcite.sql.SqlWriter;
 import org.apache.calcite.sql.type.InferTypes;
 import org.apache.calcite.sql.type.OperandTypes;
-import org.apache.calcite.util.Pair;
 
 import java.util.AbstractList;
-import java.util.Map;
 
 /**
- * Copied to keep null semantics of table api and sql in sync. At the same 
time SQL standard says
- * that the next about `ROW`:
+ * Copied to keep null semantics of table api and sql in sync.
+ *
+ * <p>There are differences following:
+ *
+ * <p>1. The return value about {@code R IS NULL} and {@code R IS NOT NULL}.
+ *
+ * <p>At the same time SQL standard says that the next about `ROW`:
  *
  * <ul>
  *   <li>The value of {@code R IS NULL} is:
@@ -66,12 +70,19 @@ import java.util.Map;
  *       </ul>
  * </ul>
  *
- * Once Flink applies same logic for both table api and sql, this class should 
be removed.
+ * <p>Once Flink applies same logic for both table api and sql, this first 
changes should be
+ * removed.
+ *
+ * <p>2. It uses {@link StructKind#PEEK_FIELDS_NO_EXPAND} with a nested struct 
type (Flink [[{@link
+ * org.apache.flink.table.types.logical.RowType}]]).
+ *
+ * <p>See more at {@link 
org.apache.flink.table.planner.typeutils.LogicalRelDataTypeConverter} and
+ * {@link org.apache.flink.table.planner.calcite.FlinkTypeFactory}.
  *
  * <p>Changed lines
  *
  * <ol>
- *   <li>Line 92 ~ 112
+ *   <li>Line 106 ~ 137
  * </ol>
  */
 public class SqlRowOperator extends SqlSpecialOperator {
@@ -96,20 +107,31 @@ public class SqlRowOperator extends SqlSpecialOperator {
         // The type of a ROW(e1,e2) expression is a record with the types
         // {e1type,e2type}.  According to the standard, field names are
         // implementation-defined.
+        int fieldCount = opBinding.getOperandCount();
         return opBinding
                 .getTypeFactory()
                 .createStructType(
-                        new AbstractList<Map.Entry<String, RelDataType>>() {
+                        StructKind.PEEK_FIELDS_NO_EXPAND,
+                        new AbstractList<RelDataType>() {
+                            @Override
+                            public RelDataType get(int index) {
+                                return opBinding.getOperandType(index);
+                            }
+
+                            @Override
+                            public int size() {
+                                return fieldCount;
+                            }
+                        },
+                        new AbstractList<String>() {
                             @Override
-                            public Map.Entry<String, RelDataType> get(int 
index) {
-                                return Pair.of(
-                                        SqlUtil.deriveAliasFromOrdinal(index),
-                                        opBinding.getOperandType(index));
+                            public String get(int index) {
+                                return SqlUtil.deriveAliasFromOrdinal(index);
                             }
 
                             @Override
                             public int size() {
-                                return opBinding.getOperandCount();
+                                return fieldCount;
                             }
                         });
         // ----- FLINK MODIFICATION END -----
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
index 71d028cd89a..52d21087262 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
@@ -233,6 +233,24 @@ LogicalProject(timestamp=[$0], metadata_timestamp=[$1], 
other=[$2], computed_oth
       <![CDATA[
 Calc(select=[timestamp, metadata_timestamp, other, UPPER(other) AS 
computed_other, CAST(metadata_timestamp AS VARCHAR(2147483647)) AS 
computed_timestamp])
 +- TableSourceScan(table=[[default_catalog, default_database, MetadataTable, 
metadata=[other, timestamp]]], fields=[timestamp, other, metadata_timestamp])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testDDLWithRowTypeComputedColumn">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM t1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2])
++- LogicalProject(a=[$0], b=[$1], c=[ROW($0, $1)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, t1]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a, b, ROW(a, b) AS c])
++- TableSourceScan(table=[[default_catalog, default_database, t1]], fields=[a, 
b])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
index 2cd7f047e98..63f69626b3d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
@@ -78,6 +78,20 @@ class TableScanTest extends TableTestBase {
     util.verifyExecPlan("SELECT * FROM t1")
   }
 
+  @Test
+  def testDDLWithRowTypeComputedColumn(): Unit = {
+    util.addTable(s"""
+                     |create table t1(
+                     |  a int,
+                     |  b varchar,
+                     |  c as row(a, b)
+                     |) with (
+                     |  'connector' = 'values'
+                     |)
+       """.stripMargin)
+    util.verifyExecPlan("SELECT * FROM t1")
+  }
+
   @Test
   def testDDLWithMetadataColumn(): Unit = {
     // tests reordering, skipping, and casting of metadata

Reply via email to