[ https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17824368#comment-17824368 ]
yisha zhou commented on FLINK-34529: ------------------------------------ Hi, [~libenchao] , [~lincoln.86xy] , I got some updates about the solution. I prepared to : # add `ProjectWindowTransposeRule` into PROJECT_RULES(used in `PROJECT_REWRITE` and `LOGICAL`), # remove `CalcRankTransposeRule` from `LOGICAL_REWRITE` For {*}Action 1{*}, there is optimization regression in one test, I'm not sure if we can ignore it and optimize the case in other way. The test is `RankTest#testRankWithAnotherRankAsInput`. The original plan is {code:java} Calc(select=[CAST(w0$o0 AS INTEGER) AS rn1, CAST(w0$o0_0 AS INTEGER) AS rn2]) +- Rank(strategy=[UpdateFastStrategy[0,2,3]], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=200], partitionBy=[a], orderBy=[b DESC], select=[a, b, c, w0$o0, w0$o0_0]) +- Exchange(distribution=[hash[a]]) +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], partitionBy=[a, c], orderBy=[b DESC], select=[a, b, c, w0$o0]) +- Exchange(distribution=[hash[a, c]]) +- Calc(select=[a, b, c]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) {code} The changed plan is {code:java} Calc(select=[CAST(w0$o0 AS INTEGER) AS rn1, CAST(w0$o0_0 AS INTEGER) AS rn2]) +- Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=200], partitionBy=[a], orderBy=[b DESC], select=[a, b, w0$o0, w0$o0_0]) +- Exchange(distribution=[hash[a]]) +- Calc(select=[a, b, w0$o0]) +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100], partitionBy=[a, c], orderBy=[b DESC], select=[a, b, c, w0$o0]) +- Exchange(distribution=[hash[a, c]]) +- Calc(select=[a, b, c]) +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime]) {code} You can find that the strategy of second Rank changed from `UpdateFastStrategy` to `RetractStrategy`. The reason is that the component of primary key of input, field `c ` is pruned, so we cannot meet the requirements of `UpdateFastStrategy` anymore. IMO, it's not a general query optimization but query execution optimization like different implementations in Join depending on upsert keys of input. Therefore, I think we can ignore this change here, WDT? For {*}Action 2{*}, I found that `CorrelateSortToRankRule` can translate LogicalCorrelate into LogicalRank, no LogicalWindow used in the process. Therefore `ProjectWindowTransposeRule` can not cover all cases of Rank and Project transpose. We should keep it. And I tried to move this rule into `LOGICAL` stage, it didn't work well. The root cause is that cost of Exchange and Rank are not related to length of output row, therefore adding a Calc before Rank maybe cost more and is not chosen by the planner. I'll keep the `CalcRankTransposeRule` in `LOGICAL_REWRITE` util the costs of Rank and Calc are optimized in future PR. Look forward to your opinions about these two updates. > Projection cannot be pushed down through rank operator. > ------------------------------------------------------- > > Key: FLINK-34529 > URL: https://issues.apache.org/jira/browse/FLINK-34529 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.19.0 > Reporter: yisha zhou > Assignee: yisha zhou > Priority: Major > > When there is a rank/deduplicate operator, the projection based on output of > this operator cannot be pushed down to the input of it. > The following code can help reproducing the issue: > {code:java} > val util = streamTestUtil() > util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c) > util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f) > val sql = > """ > |SELECT a FROM ( > | SELECT a, f, > | ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num > | FROM T1, T2 > | WHERE T1.a = T2.d > |) > |WHERE rank_num = 1 > """.stripMargin > util.verifyPlan(sql){code} > The plan is expected to be: > {code:java} > Calc(select=[a]) > +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], > select=[a, c, f]) > +- Exchange(distribution=[hash[f]]) > +- Calc(select=[a, c, f]) > +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], > leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[a]]) > : +- Calc(select=[a, c]) > : +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[d]]) > +- Calc(select=[d, f]) > +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) > {code} > Notice that the 'select' of Join operator is [a, c, d, f]. However the actual > plan is: > {code:java} > Calc(select=[a]) > +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], > select=[a, c, f]) > +- Exchange(distribution=[hash[f]]) > +- Calc(select=[a, c, f]) > +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, > e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[a]]) > : +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) > +- Exchange(distribution=[hash[d]]) > +- LegacyTableSourceScan(table=[[default_catalog, > default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) > {code} > the 'select' of Join operator is [a, b, c, d, e, f], which means the > projection in the final Calc is not passed through the Rank. > And I think an easy way to fix this issue is to add > org.apache.calcite.rel.rules.ProjectWindowTransposeRule into > FlinkStreamRuleSets.LOGICAL_OPT_RULES. -- This message was sent by Atlassian Jira (v8.20.10#820010)