godfrey he created FLINK-27272:
----------------------------------

             Summary: The plan for query with local sort is incorrect if 
adaptive batch scheduler is enabled
                 Key: FLINK-27272
                 URL: https://issues.apache.org/jira/browse/FLINK-27272
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.15.0
            Reporter: godfrey he


Add the following test case in ForwardHashExchangeTest


{code:java}
  @Test
    public void testRankWithHashShuffle() {
        util.verifyExecPlan(
                "SELECT * FROM (SELECT a, b, RANK() OVER(PARTITION BY a ORDER 
BY b) rk FROM T) WHERE rk <= 10");
    }
{code}

The result plan is:

{code:java}
Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], 
orderBy=[b ASC], global=[true], select=[a, b, w0$o0])
+- Exchange(distribution=[forward])
   +- Sort(orderBy=[a ASC, b ASC])
      +- Exchange(distribution=[hash[a]])
         +- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], 
partitionBy=[a], orderBy=[b ASC], global=[false], select=[a, b])
            +- Sort(orderBy=[a ASC, b ASC])
                +- TableSourceScan(table=[[default_catalog, default_database, 
T, project=[a, b], metadata=[]]], fields=[a, b])
{code}

There should be an additional {{Exchange(distribution=[forward])}} node between 
local {{Rank}} and {{Sort}}, other wise if adaptive batch scheduler is enabled 
but operator chain is disabled, the result may be wrong. Because the 
parallelism for local {{Rank}} and {{Sort}} should be same, otherwise the 
adaptive batch scheduler may change their parallelism.

 Local sort agg has similar problem.




--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to