Aitozi created FLINK-32320:
------------------------------
Summary: Same correlate can not be reused due to the different
correlationId
Key: FLINK-32320
URL: https://issues.apache.org/jira/browse/FLINK-32320
Project: Flink
Issue Type: Improvement
Components: Table SQL / Planner
Reporter: Aitozi
As describe in SubplanReuserTest
{code:java}
@Test
def testSubplanReuseOnCorrelate(): Unit = {
util.addFunction("str_split", new StringSplit())
val sqlQuery =
"""
|WITH r AS (SELECT a, b, c, v FROM x, LATERAL TABLE(str_split(c, '-'))
AS T(v))
|SELECT * FROM r r1, r r2 WHERE r1.v = r2.v
""".stripMargin
// TODO the sub-plan of Correlate should be reused,
// however the digests of Correlates are different
util.verifyExecPlan(sqlQuery)
}
{code}
This will produce the plan
{code:java}
HashJoin(joinType=[InnerJoin], where=[(f0 = f00)], select=[a, b, c, f0, a0, b0,
c0, f00], build=[right])
:- Exchange(distribution=[hash[f0]])
: +- Correlate(invocation=[str_split($cor0.c, _UTF-16LE'-')],
correlate=[table(str_split($cor0.c,'-'))], select=[a,b,c,f0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c,
VARCHAR(2147483647) f0)], joinType=[INNER])
: +- LegacyTableSourceScan(table=[[default_catalog, default_database, x,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
+- Exchange(distribution=[hash[f0]])
+- Correlate(invocation=[str_split($cor1.c, _UTF-16LE'-')],
correlate=[table(str_split($cor1.c,'-'))], select=[a,b,c,f0],
rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c,
VARCHAR(2147483647) f0)], joinType=[INNER])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, x,
source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
{code}
The Correlate node can not be reused due to the different correlation id.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)