[ https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17699929#comment-17699929 ]
Jane Chan commented on FLINK-26051: ----------------------------------- [~KristoffSC], Sorry for the late reply. Feel free to go ahead. Here are some investigations I made before; I hope that will help. By changing the CommonCalc#computeSelfCost to {code:scala} planner.getCostFactory.makeCost(newRowCnt, newRowCnt * (compCnt + 1), 0) {code} This issue can be fixed. Nonetheless, it will cause ~100 test plans to be changed. Over 50% of plans removed the Calc node that only contains projection. A few plans failed after applying this change. I think the leading blocker is evaluating the impact of this change to ensure there will be no performance regression for the affected tests. > one sql has row_number =1 and the subsequent SQL has "case when" and "where" > statement result Exception : The window can only be ordered in ASCENDING mode > ---------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-26051 > URL: https://issues.apache.org/jira/browse/FLINK-26051 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.12.2, 1.14.4 > Reporter: chuncheng wu > Assignee: Jane Chan > Priority: Major > Attachments: image-2022-02-10-20-13-14-424.png, > image-2022-02-11-11-18-20-594.png, image-2022-06-17-21-28-54-886.png > > > hello, > i have 2 sqls. One sql (sql0) is "select xx from ( ROW_NUMBER statment) > where rn=1" and the other one (sql1) is "s{color:#505f79}elect ${fields} > from result where ${filter_conditions}{color}" . The fields quoted in sql1 > has one "case when" field .The two sql can work well seperately.but if they > combine it results the exception as follow . It happen in the occasion when > logical plan turn into physical plan : > > {code:java} > org.apache.flink.table.api.TableException: The window can only be ordered in > ASCENDING mode. > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630) > at > org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582) > at > com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60){code} > In the stacktrace above , rownumber() 's physical rel which is > StreamExecRank In nomal change to StreamExecOverAggregate . The > StreamExecOverAggregate rel has a window= ROWS BETWEEN UNBOUNDED PRECEDING > AND CURRENT ROW which i never add .Oddly,if you remove the "case when" field > or the "where" statement in sql1 ,the program will work well, the exception > disappear. In the same time, rownumber() 's physical rel change back to > StreamExecRank. Its confusing me a lot > > example code : > {code:java} > @Test > public void testRowNumber() throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings mySetting = EnvironmentSettings > .newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); > env.setParallelism(1); > Configuration configuration = tableEnv.getConfig().getConfiguration(); > DataStream<Tuple12<String, Integer, Integer, String, Integer, > Integer, Integer, Integer, > Integer, String, String, String>> oriStream = > env.addSource(new CustomSourceRowNumber()); > Table testTable = tableEnv.fromDataStream( > oriStream, > $("biz_bill_no"), > $("task_type"), > $("task_mode"), > $("parent_task_no"), > $("total_stage_num"), > $("current_stage_index"), > $("use_pre_task_owner"), > $("poi_type"), > $("biz_origin_bill_type"), > $("sowing_task_no"), > $("dt"), > $("sowing_task_detail_id")); > tableEnv.createTemporaryView("wosOutSowingTaskDetail", testTable); > //sql0 ,select xx from ( ROW_NUMBER stament) where rn=1 > Table wosOutSowingTaskDetailLatest = tableEnv.sqlQuery( > "SELECT `biz_bill_no`\n" + > ",task_type\n" + > ",task_mode\n" + > ",parent_task_no\n" + > ",total_stage_num\n" + > ",current_stage_index\n" + > ",use_pre_task_owner\n" + > ",poi_type\n" + > ",biz_origin_bill_type\n" + > ",sowing_task_no\n" + > " FROM (\n" + > " SELECT *,\n" + > " ROW_NUMBER() OVER(PARTITION BY > dt,sowing_task_detail_id ORDER BY task_type desc) AS rn\n" > + > " FROM wosOutSowingTaskDetail\n" + > " ) tmp\n" + > " WHERE rn = 1"); > tableEnv.createTemporaryView("wosOutSowingTaskDetailLatest", > wosOutSowingTaskDetailLatest); > //sql , select ${fields} from result where ${filter_conditions} > //oddly,if we remove the "CASE WHEN" field ```"CASE WHEN task_mode > = 51 THEN parent_task_no\n" > //+ " WHEN task_mode = 40 AND total_stage_num >= 2 AND > current_stage_index >= 2 AND > //use_pre_task_owner = 1 THEN parent_task_no\n" > //+ " ELSE sowing_task_no END AS parent_task_no_cw\n"```,the > program will work well without it. > Table resultTable = tableEnv.sqlQuery("SELECT\n" + > "biz_bill_no\n" + > ", CASE WHEN task_mode = 51 THEN parent_task_no\n" + > " WHEN task_mode = 40 AND total_stage_num >= 2 AND > current_stage_index >= 2 AND use_pre_task_owner = 1 THEN parent_task_no\n" + > " ELSE sowing_task_no END AS parent_task_no_cw\n" + > ",parent_task_no" > + ",sowing_task_no, " > + "task_type, task_mode, " > + "total_stage_num, " > + "current_stage_index," > + "use_pre_task_owner \n" + > "FROM wosOutSowingTaskDetailLatest\n" + > "WHERE task_type = 21\n" + > "AND task_mode IN (51, 40)\n" + > "AND poi_type = 2\n" + > "AND biz_origin_bill_type not in (111,112,113,114)"); > > System.out.println(resultTable.explain()); > } > class CustomSourceRowNumber implements SourceFunction<Tuple12<String, > Integer, Integer, String, Integer, Integer, > Integer, Integer, > Integer, String, String, String>> { > private boolean isRuning = true; @Override > public void run( > SourceContext<Tuple12<String, Integer, Integer, String, > Integer, Integer, Integer, Integer, > Integer, String, String, String>> sourceContext) > throws Exception { > while (isRuning) { > sourceContext.collect(Tuple12.of("xxx", > 21, > 51, > "yyy", > 1, > 1, > 0, > 2, > 110, > "zzz", > "aaa", > "bbb")); > sourceContext.collect(Tuple12.of("xxx", > 21, > 40, > "yyy", > 2, > 2, > 1, > 2, > 110, > "zzz", > "aaa", > "bbb")); > Thread.sleep(Integer.MAX_VALUE); > } > } @Override > public void cancel() { > isRuning = false; > } > } {code} > System.out.println > {code:java} > SQL 0 Plan: > == Abstract Syntax Tree == > LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], > parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], > use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], > sowing_task_no=[$9]) > +- LogicalFilter(condition=[=($12, 1)]) > +- LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], > parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], > use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], > sowing_task_no=[$9], dt=[$10], sowing_task_detail_id=[$11], rn=[ROW_NUMBER() > OVER (PARTITION BY $10, $11 ORDER BY $1 DESC NULLS LAST)]) > +- LogicalTableScan(table=[[default_catalog, default_database, > wosOutSowingTaskDetail]]) > == Optimized Logical Plan == > Calc(select=[biz_bill_no, task_type, task_mode, parent_task_no, > total_stage_num, current_stage_index, use_pre_task_owner, poi_type, > biz_origin_bill_type, sowing_task_no], changelogMode=[I,UA,D]) > +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, sowing_task_detail_id], > orderBy=[task_type DESC], select=[biz_bill_no, task_type, task_mode, > parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, > poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], > changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[dt, sowing_task_detail_id]], > changelogMode=[I]) > +- DataStreamScan(table=[[default_catalog, default_database, > wosOutSowingTaskDetail]], fields=[biz_bill_no, task_type, task_mode, > parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, > poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], > changelogMode=[I]) > == Physical Execution Plan == > Stage 1 : Data Source > content : Source: Custom Source > Stage 7 : Operator > content : > SourceConversion(table=[default_catalog.default_database.wosOutSowingTaskDetail], > fields=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, > current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, > sowing_task_no, dt, sowing_task_detail_id]) > ship_strategy : FORWARD > Stage 9 : Operator > content : Rank(strategy=[AppendFastStrategy], > rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, > sowing_task_detail_id], orderBy=[task_type DESC], select=[biz_bill_no, > task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, > use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, > sowing_task_detail_id]) > ship_strategy : HASH > Stage 10 : Operator > content : Calc(select=[biz_bill_no, task_type, task_mode, > parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, > poi_type, biz_origin_bill_type, sowing_task_no]) > ship_strategy : FORWARD > SQL 1 Plan: > org.apache.flink.table.api.TableException: The window can only be ordered in > ASCENDING mode.{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)