[
https://issues.apache.org/jira/browse/FLINK-21725?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-21725:
-----------------------------------
Labels: pull-request-available (was: )
> DataTypeExtractor extracts wrong fields ordering for Tuple12
> ------------------------------------------------------------
>
> Key: FLINK-21725
> URL: https://issues.apache.org/jira/browse/FLINK-21725
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.11.3, 1.12.2, 1.13.0
> Reporter: Jark Wu
> Assignee: Timo Walther
> Priority: Critical
> Labels: pull-request-available
>
> The following test can reproduce the problem:
> {code:java}
> /** Emit Tuple12 result. */
> public static class JavaTableFuncTuple12
> extends TableFunction<
> Tuple12<
> String,
> String,
> String,
> String,
> String,
> String,
> Integer,
> Integer,
> Integer,
> Integer,
> Integer,
> Integer>> {
> private static final long serialVersionUID = -8258882510989374448L;
> public void eval(String str) {
> collect(
> Tuple12.of(
> str + "_a",
> str + "_b",
> str + "_c",
> str + "_d",
> str + "_e",
> str + "_f",
> str.length(),
> str.length() + 1,
> str.length() + 2,
> str.length() + 3,
> str.length() + 4,
> str.length() + 5));
> }
> }
> {code}
> {code:scala}
> @Test
> def testCorrelateTuple12(): Unit = {
> val util = streamTestUtil()
> util.addTableSource[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
> val function = new JavaTableFuncTuple12
> util.addTemporarySystemFunction("func1", function)
> val sql =
> """
> |SELECT *
> |FROM MyTable, LATERAL TABLE(func1(c)) AS T
> |""".stripMargin
> util.verifyExecPlan(sql)
> }
> {code}
> {code}
> // output plan
> Correlate(invocation=[func1($cor0.c)], correlate=[table(func1($cor0.c))],
> select=[a,b,c,f0,f1,f10,f11,f2,f3,f4,f5,f6,f7,f8,f9],
> rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c,
> VARCHAR(2147483647) f0, VARCHAR(2147483647) f1, INTEGER f10, INTEGER f11,
> VARCHAR(2147483647) f2, VARCHAR(2147483647) f3, VARCHAR(2147483647) f4,
> VARCHAR(2147483647) f5, INTEGER f6, INTEGER f7, INTEGER f8, INTEGER f9)],
> joinType=[INNER])
> +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable,
> source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> {code}
> Note that there is no problem if using the legacy {{tEnv.registerFunction}}
> to register function, becuase it uses {{TypeInformation}}. However, it has
> problem if using {{tEnv.createTemporaryFunction}} or {{CREATE FUNCTION}}
> syntax, because it uses {{TypeInference}}.
> Note this problem exists in latest 1.11, 1.12, and master branch.
> I think the problem might lay in this line:
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/DataTypeExtractor.java#L562
> because it orders field names by alphabetical.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)