Junning Liang created FLINK-20231:
-------------------------------------
Summary: Sql UDTF subplan reuse on correlate
Key: FLINK-20231
URL: https://issues.apache.org/jira/browse/FLINK-20231
Project: Flink
Issue Type: Improvement
Affects Versions: 1.10.1, 1.10.0
Reporter: Junning Liang
Hi all,
I would like to start a discussion for subplan reuse on correlate.
when I wrote a test case for the UDTF with two sinks, I saw the relnode digest
didn't reuse any except TableSourceScan. Code show as below.
{code:java}
CREATE VIEW tempTable1 as SELECT name, age, habit, length FROM sources ,
LATERAL TABLE(SplitStringUDTF(habits)) as T(habit, length);
INSERT INTO sinks SELECT * FROM tempTable1;
INSERT INTO sinks1 SELECT * FROM tempTable1;
{code}
And two sinks relnode digest as below.
{code:java}
Sink(name=[`default_catalog`.`default_database`.`sinks`], fields=[name, age,
habit, length], accMode=[Acc]), rowType=[RecordType:peek_no_expand(BOOLEAN f0,
RecordType:peek_no_expand(VARCHAR(2147483647) name, INTEGER age,
VARCHAR(2147483647) habit, INTEGER length) f1)]
Calc(select=[name, age, f0 AS habit, f1 AS length], accMode=[Acc]),
rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647)
habit, INTEGER length)]
Correlate(invocation=[SplitStringUDTF($cor1.habits)],
correlate=[table(default_catalog.default_database.SplitStringUDTF($cor1.habits))],
select=[name,age,habits,f0,f1], rowType=[RecordType(VARCHAR(2147483647) name,
INTEGER age, VARCHAR(2147483647) habits, VARCHAR(2147483647) f0, INTEGER f1)],
joinType=[INNER], accMode=[Acc]), rowType=[RecordType(VARCHAR(2147483647) name,
INTEGER age, VARCHAR(2147483647) habits, VARCHAR(2147483647) f0, INTEGER f1)]
TableSourceScan(table=[[default_catalog, default_database, sources,
source: [HDFSTbleSource(name, age, habits)]]], fields=[name, age, habits],
accMode=[Acc]), rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age,
VARCHAR(2147483647) habits)]{code}
{code:java}
// code placeholder
Sink(name=[`default_catalog`.`default_database`.`sinks1`], fields=[name, age,
habit, length], accMode=[Acc]), rowType=[RecordType:peek_no_expand(BOOLEAN f0,
RecordType:peek_no_expand(VARCHAR(2147483647) name, INTEGER age,
VARCHAR(2147483647) habit, INTEGER length) f1)]
Calc(select=[name, age, f0 AS habit, f1 AS length], accMode=[Acc]),
rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age, VARCHAR(2147483647)
habit, INTEGER length)]
Correlate(invocation=[SplitStringUDTF($cor2.habits)],
correlate=[table(default_catalog.default_database.SplitStringUDTF($cor2.habits))],
select=[name,age,habits,f0,f1], rowType=[RecordType(VARCHAR(2147483647) name,
INTEGER age, VARCHAR(2147483647) habits, VARCHAR(2147483647) f0, INTEGER f1)],
joinType=[INNER], accMode=[Acc]), rowType=[RecordType(VARCHAR(2147483647) name,
INTEGER age, VARCHAR(2147483647) habits, VARCHAR(2147483647) f0, INTEGER f1)]
TableSourceScan(table=[[default_catalog, default_database, sources,
source: [HDFSTableSource(name, age, habits)]]], fields=[name, age, habits],
accMode=[Acc]), rowType=[RecordType(VARCHAR(2147483647) name, INTEGER age,
VARCHAR(2147483647) habits)]
{code}
As we see, only TableSourceScan plan was reused. And I found related tests in
SubplanReuseTest.scala.but it would todo since 2019.
I wish some solutions have been proposed.
{code:java}
// code placeholder
@Test def testSubplanReuseOnCorrelate(): Unit = {
util.addFunction("str_split", new StringSplit())
val sqlQuery =
"""
|WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, '-'))
AS T(v))
|SELECT * FROM r r1, r r2 WHERE r1.v = r2.v
""".stripMargin
// TODO the sub-plan of Correlate should be reused,
// however the digests of Correlates are different
util.verifyPlan(sqlQuery)
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)