yisha zhou created FLINK-34529: ---------------------------------- Summary: Projection cannot be pushed down through rank operator. Key: FLINK-34529 URL: https://issues.apache.org/jira/browse/FLINK-34529 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.19.0 Reporter: yisha zhou
When there is a rank/deduplicate operator, the projection based on output of this operator cannot be pushed down to the input of it. The following code can help reproducing the issue: {code:java} val util = streamTestUtil() util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c) util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f) val sql = """ |SELECT a FROM ( | SELECT a, f, | ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num | FROM T1, T2 | WHERE T1.a = T2.d |) |WHERE rank_num = 1 """.stripMargin util.verifyPlan(sql){code} The plan is expected to be: {code:java} Calc(select=[a]) +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], select=[a, c, f]) +- Exchange(distribution=[hash[f]]) +- Calc(select=[a, c, f]) +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a]]) : +- Calc(select=[a, c]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +- Exchange(distribution=[hash[d]]) +- Calc(select=[d, f]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) {code} Notice that the 'select' of Join operator is [a, c, d, f]. However the actual plan is: {code:java} Calc(select=[a]) +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], select=[a, c, f]) +- Exchange(distribution=[hash[f]]) +- Calc(select=[a, c, f]) +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[a]]) : +- LegacyTableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) +- Exchange(distribution=[hash[d]]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) {code} the 'select' of Join operator is [a, b, c, d, e, f], which means the projection in the final Calc is not passed through the Rank. And I think an easy way to fix this issue is to add org.apache.calcite.rel.rules.ProjectWindowTransposeRule into FlinkStreamRuleSets.LOGICAL_OPT_RULES. -- This message was sent by Atlassian Jira (v8.20.10#820010)