[ 
https://issues.apache.org/jira/browse/FLINK-32320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17731823#comment-17731823
 ] 

Aitozi commented on FLINK-32320:
--------------------------------

In production, multi sink job are very common, if the table from UDTF is 
queried multi times, it will cause the function to be executed multi times(as 
shown in the plan). This will lead to bad performance. 

After some research, I think it's should be caused by: During sqlToRel process, 
the table function sqlNode will be `toRel` multi times and leads to different 
correlationId.


> Same correlate can not be reused due to the different correlationId
> -------------------------------------------------------------------
>
>                 Key: FLINK-32320
>                 URL: https://issues.apache.org/jira/browse/FLINK-32320
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>            Reporter: Aitozi
>            Priority: Major
>
> As describe in SubplanReuserTest
> {code:java}
>   @Test
>   def testSubplanReuseOnCorrelate(): Unit = {
>     util.addFunction("str_split", new StringSplit())
>     val sqlQuery =
>       """
>         |WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, 
> '-')) AS T(v))
>         |SELECT * FROM r r1, r r2 WHERE r1.v = r2.v
>       """.stripMargin
>     // TODO the sub-plan of Correlate should be reused,
>     // however the digests of Correlates are different
>     util.verifyExecPlan(sqlQuery)
>   }
> {code}
> This will produce the plan 
> {code:java}
> HashJoin(joinType=[InnerJoin], where=[(f0 = f00)], select=[a, b, c, f0, a0, 
> b0, c0, f00], build=[right])
> :- Exchange(distribution=[hash[f0]])
> :  +- Correlate(invocation=[str_split($cor0.c, _UTF-16LE'-')], 
> correlate=[table(str_split($cor0.c,'-'))], select=[a,b,c,f0], 
> rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
> VARCHAR(2147483647) f0)], joinType=[INNER])
> :     +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
> source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> +- Exchange(distribution=[hash[f0]])
>    +- Correlate(invocation=[str_split($cor1.c, _UTF-16LE'-')], 
> correlate=[table(str_split($cor1.c,'-'))], select=[a,b,c,f0], 
> rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
> VARCHAR(2147483647) f0)], joinType=[INNER])
>       +- LegacyTableSourceScan(table=[[default_catalog, default_database, x, 
> source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
> {code}
> The Correlate node can not be reused due to the different correlation id.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to