This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bebf3b5a105dd4bc21882116570c6d71299269a6
Author: Timo Walther <twal...@apache.org>
AuthorDate: Thu Mar 11 12:22:26 2021 +0100

    [FLINK-21725][table] Add tests for large tuples
    
    This closes #15154.
---
 .../types/extraction/DataTypeExtractorTest.java    | 46 +++++++++++++++++++++-
 .../utils/JavaUserDefinedTableFunctions.java       | 37 +++++++++++++++++
 .../planner/plan/stream/table/CorrelateTest.xml    | 22 +++++++++++
 .../planner/plan/stream/table/CorrelateTest.scala  | 16 ++++++++
 4 files changed, 120 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
index ea42375..531fcf2 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.types.extraction;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple12;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.annotation.HintFlag;
@@ -445,7 +446,27 @@ public class DataTypeExtractorTest {
                                                         DataTypes.INT(), 
DataTypes.STRING())))),
                 TestSpec.forType("Invalid data view", 
AccumulatorWithInvalidView.class)
                         .expectErrorMessage(
-                                "Annotated list views should have a logical 
type of ARRAY."));
+                                "Annotated list views should have a logical 
type of ARRAY."),
+                TestSpec.forGeneric(
+                                "Assigning constructor for tuples",
+                                TableFunction.class,
+                                0,
+                                Tuple12TableFunction.class)
+                        .expectDataType(
+                                DataTypes.STRUCTURED(
+                                        Tuple12.class,
+                                        DataTypes.FIELD("f0", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("f1", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("f2", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("f3", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("f4", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("f5", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("f6", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("f7", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("f8", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("f9", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("f10", 
DataTypes.STRING()),
+                                        DataTypes.FIELD("f11", 
DataTypes.INT()))));
     }
 
     @Parameter public TestSpec testSpec;
@@ -1024,4 +1045,27 @@ public class DataTypeExtractorTest {
         @DataTypeHint("INT")
         public ListView<?> listView;
     }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Table function that uses a big tuple with constructor defined field 
order. */
+    public static class Tuple12TableFunction
+            extends TableFunction<
+                    Tuple12<
+                            String,
+                            String,
+                            String,
+                            String,
+                            String,
+                            String,
+                            String,
+                            String,
+                            String,
+                            String,
+                            String,
+                            Integer>> {
+        public void eval() {
+            // nothing to do
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
index 3384257..f7f0d38 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.runtime.utils;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.tuple.Tuple12;
 import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.functions.TableFunction;
 
@@ -128,4 +129,40 @@ public class JavaUserDefinedTableFunctions {
             return false;
         }
     }
+
+    /** Function with large tuple. */
+    public static class JavaTableFuncTuple12
+            extends TableFunction<
+                    Tuple12<
+                            String,
+                            String,
+                            String,
+                            String,
+                            String,
+                            String,
+                            Integer,
+                            Integer,
+                            Integer,
+                            Integer,
+                            Integer,
+                            Integer>> {
+        private static final long serialVersionUID = -8258882510989374448L;
+
+        public void eval(String str) {
+            collect(
+                    Tuple12.of(
+                            str + "_a",
+                            str + "_b",
+                            str + "_c",
+                            str + "_d",
+                            str + "_e",
+                            str + "_f",
+                            str.length(),
+                            str.length() + 1,
+                            str.length() + 2,
+                            str.length() + 3,
+                            str.length() + 4,
+                            str.length() + 5));
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml
 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml
index ef1ef99..3dabac3 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml
+++ 
b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml
@@ -31,6 +31,28 @@ 
PythonCorrelate(invocation=[org$apache$flink$table$planner$utils$MockPythonTable
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testCorrelateTuple12">
+    <Resource name="sql">
+      <![CDATA[
+SELECT *
+FROM MyTable, LATERAL TABLE(func1(c)) AS T
+]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], f0=[$3], f1=[$4], f2=[$5], f3=[$6], 
f4=[$7], f5=[$8], f6=[$9], f7=[$10], f8=[$11], f9=[$12], f10=[$13], f11=[$14])
++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], 
requiredColumns=[{2}])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]])
+   +- LogicalTableFunctionScan(invocation=[func1($cor0.c)], 
rowType=[*org.apache.flink.api.java.tuple.Tuple12* NOT NULL])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Correlate(invocation=[func1($cor0.c)], correlate=[table(func1($cor0.c))], 
select=[a,b,c,f0,f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11], 
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0, VARCHAR(2147483647) f1, VARCHAR(2147483647) f2, 
VARCHAR(2147483647) f3, VARCHAR(2147483647) f4, VARCHAR(2147483647) f5, INTEGER 
f6, INTEGER f7, INTEGER f8, INTEGER f9, INTEGER f10, INTEGER f11)], 
joinType=[INNER])
++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, 
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testCorrelateWithMultiFilter">
     <Resource name="ast">
       <![CDATA[
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
index 789b089..8ce336e 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala
@@ -21,6 +21,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.table.api._
 import org.apache.flink.table.planner.expressions.utils.Func13
 import org.apache.flink.table.planner.plan.optimize.program.FlinkStreamProgram
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.JavaTableFuncTuple12
 import org.apache.flink.table.planner.utils._
 
 import org.apache.calcite.rel.rules.CoreRules
@@ -188,4 +189,19 @@ class CorrelateTest extends TableTestBase {
 
     util.verifyExecPlan(result)
   }
+
+  @Test
+  def testCorrelateTuple12(): Unit = {
+    val util = streamTestUtil()
+    util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = new JavaTableFuncTuple12
+    util.addTemporarySystemFunction("func1", function)
+    val sql =
+      """
+        |SELECT *
+        |FROM MyTable, LATERAL TABLE(func1(c)) AS T
+        |""".stripMargin
+
+    util.verifyExecPlan(sql)
+  }
 }

Reply via email to