[ 
https://issues.apache.org/jira/browse/FLINK-34529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17826918#comment-17826918
 ] 

Benchao Li commented on FLINK-34529:
------------------------------------

[~nilerzhou] Thanks for the update. 

For #1, is there a way to preserve 'primary key' fields for {{Rank}}? Usually 
an regression is not acceptable.

For #2, I agree. 

And putting all these together, I'm just thinking that can we just improve 
current case without introducing the general {{ProjectWindowTransposeRule}}, 
since it does not take 'primary key' into account and we'll end up with an 
regression. Putting more transposing rules into 'default rewrite' phase might 
be the minimal step to move forward without any regression. What do you think?

> Projection cannot be pushed down through rank operator.
> -------------------------------------------------------
>
>                 Key: FLINK-34529
>                 URL: https://issues.apache.org/jira/browse/FLINK-34529
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.19.0
>            Reporter: yisha zhou
>            Assignee: yisha zhou
>            Priority: Major
>
> When there is a rank/deduplicate operator, the projection based on output of 
> this operator cannot be pushed down to the input of it.
> The following code can help reproducing the issue:
> {code:java}
> val util = streamTestUtil() 
> util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c)
> util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f)
> val sql =
>   """
>     |SELECT a FROM (
>     |  SELECT a, f,
>     |      ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num
>     |  FROM  T1, T2
>     |  WHERE T1.a = T2.d
>     |)
>     |WHERE rank_num = 1
>   """.stripMargin
> util.verifyPlan(sql){code}
> The plan is expected to be:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>    +- Exchange(distribution=[hash[f]])
>       +- Calc(select=[a, c, f])
>          +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], 
> leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
>             :- Exchange(distribution=[hash[a]])
>             :  +- Calc(select=[a, c])
>             :     +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
>             +- Exchange(distribution=[hash[d]])
>                +- Calc(select=[d, f])
>                   +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) 
> {code}
> Notice that the 'select' of Join operator is [a, c, d, f]. However the actual 
> plan is:
> {code:java}
> Calc(select=[a])
> +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
> select=[a, c, f])
>    +- Exchange(distribution=[hash[f]])
>       +- Calc(select=[a, c, f])
>          +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, 
> e, f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
>             :- Exchange(distribution=[hash[a]])
>             :  +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
>             +- Exchange(distribution=[hash[d]])
>                +- LegacyTableSourceScan(table=[[default_catalog, 
> default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
>  {code}
> the 'select' of Join operator is [a, b, c, d, e, f], which means the 
> projection in the final Calc is not passed through the Rank.
> And I think an easy way to fix this issue is to add 
> org.apache.calcite.rel.rules.ProjectWindowTransposeRule into 
> FlinkStreamRuleSets.LOGICAL_OPT_RULES.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to