This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit bebf3b5a105dd4bc21882116570c6d71299269a6 Author: Timo Walther <twal...@apache.org> AuthorDate: Thu Mar 11 12:22:26 2021 +0100 [FLINK-21725][table] Add tests for large tuples This closes #15154. --- .../types/extraction/DataTypeExtractorTest.java | 46 +++++++++++++++++++++- .../utils/JavaUserDefinedTableFunctions.java | 37 +++++++++++++++++ .../planner/plan/stream/table/CorrelateTest.xml | 22 +++++++++++ .../planner/plan/stream/table/CorrelateTest.scala | 16 ++++++++ 4 files changed, 120 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java index ea42375..531fcf2 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/DataTypeExtractorTest.java @@ -20,6 +20,7 @@ package org.apache.flink.table.types.extraction; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.java.tuple.Tuple12; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.HintFlag; @@ -445,7 +446,27 @@ public class DataTypeExtractorTest { DataTypes.INT(), DataTypes.STRING())))), TestSpec.forType("Invalid data view", AccumulatorWithInvalidView.class) .expectErrorMessage( - "Annotated list views should have a logical type of ARRAY.")); + "Annotated list views should have a logical type of ARRAY."), + TestSpec.forGeneric( + "Assigning constructor for tuples", + TableFunction.class, + 0, + Tuple12TableFunction.class) + .expectDataType( + DataTypes.STRUCTURED( + Tuple12.class, + DataTypes.FIELD("f0", DataTypes.STRING()), + DataTypes.FIELD("f1", DataTypes.STRING()), + DataTypes.FIELD("f2", DataTypes.STRING()), + DataTypes.FIELD("f3", DataTypes.STRING()), + DataTypes.FIELD("f4", DataTypes.STRING()), + DataTypes.FIELD("f5", DataTypes.STRING()), + DataTypes.FIELD("f6", DataTypes.STRING()), + DataTypes.FIELD("f7", DataTypes.STRING()), + DataTypes.FIELD("f8", DataTypes.STRING()), + DataTypes.FIELD("f9", DataTypes.STRING()), + DataTypes.FIELD("f10", DataTypes.STRING()), + DataTypes.FIELD("f11", DataTypes.INT())))); } @Parameter public TestSpec testSpec; @@ -1024,4 +1045,27 @@ public class DataTypeExtractorTest { @DataTypeHint("INT") public ListView<?> listView; } + + // -------------------------------------------------------------------------------------------- + + /** Table function that uses a big tuple with constructor defined field order. */ + public static class Tuple12TableFunction + extends TableFunction< + Tuple12< + String, + String, + String, + String, + String, + String, + String, + String, + String, + String, + String, + Integer>> { + public void eval() { + // nothing to do + } + } } diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java index 3384257..f7f0d38 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedTableFunctions.java @@ -20,6 +20,7 @@ package org.apache.flink.table.planner.runtime.utils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple12; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.functions.TableFunction; @@ -128,4 +129,40 @@ public class JavaUserDefinedTableFunctions { return false; } } + + /** Function with large tuple. */ + public static class JavaTableFuncTuple12 + extends TableFunction< + Tuple12< + String, + String, + String, + String, + String, + String, + Integer, + Integer, + Integer, + Integer, + Integer, + Integer>> { + private static final long serialVersionUID = -8258882510989374448L; + + public void eval(String str) { + collect( + Tuple12.of( + str + "_a", + str + "_b", + str + "_c", + str + "_d", + str + "_e", + str + "_f", + str.length(), + str.length() + 1, + str.length() + 2, + str.length() + 3, + str.length() + 4, + str.length() + 5)); + } + } } diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml index ef1ef99..3dabac3 100644 --- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.xml @@ -31,6 +31,28 @@ PythonCorrelate(invocation=[org$apache$flink$table$planner$utils$MockPythonTable ]]> </Resource> </TestCase> + <TestCase name="testCorrelateTuple12"> + <Resource name="sql"> + <![CDATA[ +SELECT * +FROM MyTable, LATERAL TABLE(func1(c)) AS T +]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(a=[$0], b=[$1], c=[$2], f0=[$3], f1=[$4], f2=[$5], f3=[$6], f4=[$7], f5=[$8], f6=[$9], f7=[$10], f8=[$11], f9=[$12], f10=[$13], f11=[$14]) ++- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}]) + :- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]]) + +- LogicalTableFunctionScan(invocation=[func1($cor0.c)], rowType=[*org.apache.flink.api.java.tuple.Tuple12* NOT NULL]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +Correlate(invocation=[func1($cor0.c)], correlate=[table(func1($cor0.c))], select=[a,b,c,f0,f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0, VARCHAR(2147483647) f1, VARCHAR(2147483647) f2, VARCHAR(2147483647) f3, VARCHAR(2147483647) f4, VARCHAR(2147483647) f5, INTEGER f6, INTEGER f7, INTEGER f8, INTEGER f9, INTEGER f10, INTEGER f11)], joinType=[INNER]) ++- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +]]> + </Resource> + </TestCase> <TestCase name="testCorrelateWithMultiFilter"> <Resource name="ast"> <![CDATA[ diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala index 789b089..8ce336e 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/CorrelateTest.scala @@ -21,6 +21,7 @@ import org.apache.flink.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.planner.expressions.utils.Func13 import org.apache.flink.table.planner.plan.optimize.program.FlinkStreamProgram +import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions.JavaTableFuncTuple12 import org.apache.flink.table.planner.utils._ import org.apache.calcite.rel.rules.CoreRules @@ -188,4 +189,19 @@ class CorrelateTest extends TableTestBase { util.verifyExecPlan(result) } + + @Test + def testCorrelateTuple12(): Unit = { + val util = streamTestUtil() + util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c) + val function = new JavaTableFuncTuple12 + util.addTemporarySystemFunction("func1", function) + val sql = + """ + |SELECT * + |FROM MyTable, LATERAL TABLE(func1(c)) AS T + |""".stripMargin + + util.verifyExecPlan(sql) + } }