[ 
https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17824368#comment-17824368
 ] 

yisha zhou commented on FLINK-34529:
------------------------------------

Hi, [~libenchao] , [~lincoln.86xy] , I got some updates about the solution.  I 
prepared to :
 # add `ProjectWindowTransposeRule` into PROJECT_RULES(used in 
`PROJECT_REWRITE` and `LOGICAL`),
 # remove `CalcRankTransposeRule` from `LOGICAL_REWRITE` 

For {*}Action 1{*}, there is optimization regression in one test, I'm not sure 
if we can ignore it and optimize the case in other way. The test is 
`RankTest#testRankWithAnotherRankAsInput`.

The original plan is 
{code:java}
Calc(select=[CAST(w0$o0 AS INTEGER) AS rn1, CAST(w0$o0_0 AS INTEGER) AS rn2])
+- Rank(strategy=[UpdateFastStrategy[0,2,3]], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=200], partitionBy=[a], orderBy=[b DESC], 
select=[a, b, c, w0$o0, w0$o0_0])
   +- Exchange(distribution=[hash[a]])
      +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=100], partitionBy=[a, c], orderBy=[b DESC], 
select=[a, b, c, w0$o0])
         +- Exchange(distribution=[hash[a, c]])
            +- Calc(select=[a, b, c])
               +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime]) {code}
The changed plan is 
{code:java}
Calc(select=[CAST(w0$o0 AS INTEGER) AS rn1, CAST(w0$o0_0 AS INTEGER) AS rn2])
+- Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=200], partitionBy=[a], orderBy=[b DESC], 
select=[a, b, w0$o0, w0$o0_0])
   +- Exchange(distribution=[hash[a]])
      +- Calc(select=[a, b, w0$o0])
         +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=100], partitionBy=[a, c], orderBy=[b DESC], 
select=[a, b, c, w0$o0])
            +- Exchange(distribution=[hash[a, c]])
               +- Calc(select=[a, b, c])
                  +- DataStreamScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c, proctime, rowtime])
{code}
You can find that the strategy of second Rank changed from `UpdateFastStrategy` 
to `RetractStrategy`. The reason is that the component of primary key of input, 
field `c ` is  pruned, so we cannot meet the requirements of 
`UpdateFastStrategy` anymore.

IMO, it's not a general query optimization but query execution optimization 
like different implementations in Join depending on upsert keys of input. 
Therefore, I think we can ignore this change here, WDT?

For {*}Action 2{*}, I found that `CorrelateSortToRankRule` can translate 
LogicalCorrelate into LogicalRank, no LogicalWindow used in the process. 
Therefore  `ProjectWindowTransposeRule` can not cover all cases of Rank and 
Project transpose.  We should keep it. And I tried to move this rule into 
`LOGICAL`  stage, it didn't work well.

The root cause is that cost of Exchange and Rank are not related to length of 
output row, therefore adding a Calc before Rank maybe cost more and is not 
chosen by the planner.  I'll keep the `CalcRankTransposeRule` in 
`LOGICAL_REWRITE` util the costs of Rank and Calc are optimized in future PR.

Look forward to your opinions about these two updates.

> Projection cannot be pushed down through rank operator.
> -------------------------------------------------------
>
>                 Key: FLINK-34529
>                 URL: https://issues.apache.org/jira/browse/FLINK-34529
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.19.0
>            Reporter: yisha zhou
>            Assignee: yisha zhou
>            Priority: Major
>
> When there is a rank/deduplicate operator, the projection based on output of 
> this operator cannot be pushed down to the input of it.
> The following code can help reproducing the issue:
> {code:java}
> val util = streamTestUtil() 
> util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c)
> util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f)
> val sql =
>   """
>     |SELECT a FROM (
>     |  SELECT a, f,
>     |      ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num
>     |  FROM  T1, T2
>     |  WHERE T1.a = T2.d
>     |)
>     |WHERE rank_num = 1
>   """.stripMargin
> util.verifyPlan(sql){code}
> The plan is expected to be:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>    +- Exchange(distribution=[hash[f]])
>       +- Calc(select=[a, c, f])
>          +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
>             :- Exchange(distribution=[hash[a]])
>             :  +- Calc(select=[a, c])
>             :     +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
>             +- Exchange(distribution=[hash[d]])
>                +- Calc(select=[d, f])
>                   +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) 
> {code}
> Notice that the 'select' of Join operator is [a, c, d, f]. However the actual 
> plan is:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>    +- Exchange(distribution=[hash[f]])
>       +- Calc(select=[a, c, f])
>          +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, 
> e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
>             :- Exchange(distribution=[hash[a]])
>             :  +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
>             +- Exchange(distribution=[hash[d]])
>                +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
>  {code}
> the 'select' of Join operator is [a, b, c, d, e, f], which means the 
> projection in the final Calc is not passed through the Rank.
> And I think an easy way to fix this issue is to add 
> org.apache.calcite.rel.rules.ProjectWindowTransposeRule into 
> FlinkStreamRuleSets.LOGICAL_OPT_RULES.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to