godfrey he created FLINK-27272: ---------------------------------- Summary: The plan for query with local sort is incorrect if adaptive batch scheduler is enabled Key: FLINK-27272 URL: https://issues.apache.org/jira/browse/FLINK-27272 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.15.0 Reporter: godfrey he
Add the following test case in ForwardHashExchangeTest {code:java} @Test public void testRankWithHashShuffle() { util.verifyExecPlan( "SELECT * FROM (SELECT a, b, RANK() OVER(PARTITION BY a ORDER BY b) rk FROM T) WHERE rk <= 10"); } {code} The result plan is: {code:java} Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[b ASC], global=[true], select=[a, b, w0$o0]) +- Exchange(distribution=[forward]) +- Sort(orderBy=[a ASC, b ASC]) +- Exchange(distribution=[hash[a]]) +- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a], orderBy=[b ASC], global=[false], select=[a, b]) +- Sort(orderBy=[a ASC, b ASC]) +- TableSourceScan(table=[[default_catalog, default_database, T, project=[a, b], metadata=[]]], fields=[a, b]) {code} There should be an additional {{Exchange(distribution=[forward])}} node between local {{Rank}} and {{Sort}}, other wise if adaptive batch scheduler is enabled but operator chain is disabled, the result may be wrong. Because the parallelism for local {{Rank}} and {{Sort}} should be same, otherwise the adaptive batch scheduler may change their parallelism. Local sort agg has similar problem. -- This message was sent by Atlassian Jira (v8.20.1#820001)