yisha zhou created FLINK-34529:
----------------------------------

             Summary: Projection cannot be pushed down through rank operator.
                 Key: FLINK-34529
                 URL: https://issues.apache.org/jira/browse/FLINK-34529
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.19.0
            Reporter: yisha zhou


When there is a rank/deduplicate operator, the projection based on output of 
this operator cannot be pushed down to the input of it.

The following code can help reproducing the issue:
{code:java}
val util = streamTestUtil() 
util.addTableSource[(String, Int, String)]("T1", 'a, 'b, 'c)
util.addTableSource[(String, Int, String)]("T2", 'd, 'e, 'f)
val sql =
  """
    |SELECT a FROM (
    |  SELECT a, f,
    |      ROW_NUMBER() OVER (PARTITION BY f ORDER BY c DESC) as rank_num
    |  FROM  T1, T2
    |  WHERE T1.a = T2.d
    |)
    |WHERE rank_num = 1
  """.stripMargin

util.verifyPlan(sql){code}
The plan is expected to be:
{code:java}
Calc(select=[a])
+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
select=[a, c, f])
   +- Exchange(distribution=[hash[f]])
      +- Calc(select=[a, c, f])
         +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, c, d, f], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
            :- Exchange(distribution=[hash[a]])
            :  +- Calc(select=[a, c])
            :     +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
            +- Exchange(distribution=[hash[d]])
               +- Calc(select=[d, f])
                  +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f]) 
{code}
Notice that the 'select' of Join operator is [a, c, d, f]. However the actual 
plan is:
{code:java}
Calc(select=[a])
+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], 
rankRange=[rankStart=1, rankEnd=1], partitionBy=[f], orderBy=[c DESC], 
select=[a, c, f])
   +- Exchange(distribution=[hash[f]])
      +- Calc(select=[a, c, f])
         +- Join(joinType=[InnerJoin], where=[=(a, d)], select=[a, b, c, d, e, 
f], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
            :- Exchange(distribution=[hash[a]])
            :  +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, T1, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
            +- Exchange(distribution=[hash[d]])
               +- LegacyTableSourceScan(table=[[default_catalog, 
default_database, T2, source: [TestTableSource(d, e, f)]]], fields=[d, e, f])
 {code}
the 'select' of Join operator is [a, b, c, d, e, f], which means the projection 
in the final Calc is not passed through the Rank.

And I think an easy way to fix this issue is to add 
org.apache.calcite.rel.rules.ProjectWindowTransposeRule into 
FlinkStreamRuleSets.LOGICAL_OPT_RULES.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to