[ https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17490165#comment-17490165 ]
zhangbin edited comment on FLINK-26051 at 2/10/22, 1:42 PM: ------------------------------------------------------------ Using the Flink1.12.2 test on the idea, the debug log found that the logicalPlan before executing the FlinkLogicalRankRuleForRangeEnd rule is different before apply FlinkLogicalRankRuleForRangeEnd – with case when FlinkLogicalCalc(select=[biz_bill_no, CASE(OR(=(task_mode, 51), AND(=(task_mode, 40), >=(total_stage_num, 2), >=(current_stage_index, 2), =(use_pre_task_owner, 1))), parent_task_no, sowing_task_no) AS parent_task_no_cw, parent_task_no, sowing_task_no, task_type, task_mode, total_stage_num, current_stage_index, use_pre_task_owner], where=[AND(SEARCH(w0$o0, Sarg[1L:BIGINT]:BIGINT), SEARCH(task_type, Sarg[21]), SEARCH(task_mode, Sarg[40, 51]), SEARCH(poi_type, Sarg[2]), SEARCH(biz_origin_bill_type, Sarg[(-∞..111), (111..112), (112..113), (113..114), (114..+∞)]))]) +- FlinkLogicalCalc(select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, w0$o0]) +- FlinkLogicalOverAggregate(window#0=[window(partition \{10, 11} order by [1 DESC-nulls-last] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, wosOutSowingTaskDetail]]) not match FlinkLogicalRankRuleForRangeEnd, But it matches the FlinkCalcMergeRule after FlinkLogicalRankRuleForRangeEnd optimize result: FlinkLogicalCalc(select=[biz_bill_no, CASE(OR(=(task_mode, 51), AND(=(task_mode, 40), >=(total_stage_num, 2), >=(current_stage_index, 2), =(use_pre_task_owner, 1))), parent_task_no, sowing_task_no) AS parent_task_no_cw, parent_task_no, sowing_task_no, task_type, task_mode, total_stage_num, current_stage_index, use_pre_task_owner], where=[AND(SEARCH(w0$o0, Sarg[1L:BIGINT]:BIGINT), SEARCH(task_type, Sarg[21]), SEARCH(task_mode, Sarg[40, 51]), SEARCH(poi_type, Sarg[2]), SEARCH(biz_origin_bill_type, Sarg[(-∞..111), (111..112), (112..113), (113..114), (114..+∞)]))]) +- {color:#ff0000}FlinkLogicalOverAggregate{color}(window#0=[window(partition \{10, 11} order by [1 DESC-nulls-last] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, wosOutSowingTaskDetail]]) – without case when FlinkLogicalCalc(select=[biz_bill_no, parent_task_no, sowing_task_no, task_type, task_mode, total_stage_num, current_stage_index, use_pre_task_owner], where=[AND(SEARCH(w0$o0, Sarg[1L:BIGINT]:BIGINT), SEARCH(task_type, Sarg[21]), SEARCH(task_mode, Sarg[40, 51]), SEARCH(poi_type, Sarg[2]), SEARCH(biz_origin_bill_type, Sarg[(-∞..111), (111..112), (112..113), (113..114), (114..+∞)]))]) +- FlinkLogicalOverAggregate(window#0=[window(partition \{10, 11} order by [1 DESC-nulls-last] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, wosOutSowingTaskDetail]]) match FlinkLogicalRankRuleForRangeEnd optimize result: FlinkLogicalCalc(select=[biz_bill_no, parent_task_no, sowing_task_no, task_type, task_mode, total_stage_num, current_stage_index, use_pre_task_owner], where=[AND(AND(AND(AND(AND(AND(=(task_type, 21), OR(=(task_mode, 40), =(task_mode, 51))), =(poi_type, 2)), <>(biz_origin_bill_type, 111)), <>(biz_origin_bill_type, 112)), <>(biz_origin_bill_type, 113)), <>(biz_origin_bill_type, 114))]) +- {color:#ff0000}FlinkLogicalRank{color}(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt,sowing_task_detail_id], orderBy=[task_type DESC], select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id]) +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, wosOutSowingTaskDetail]]) SQL with case when is converted to FlinkLogicalOverAggregate, SQL without case when is converted to FlinkLogicalRank The problem was solved when I increased FlinkCalcMergeRule.INSTANCE before FlinkLogicalRankRule.INSTANCE in LOGICAL_REWRITE ruleset of FlinkStreamRuleSets.java !image-2022-02-10-20-13-14-424.png|width=645,height=384! cc [Leonard Xu ,|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=leonard] [Godfrey He|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=godfrey] [,|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=leonard] [Jark Wu|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=jark] was (Author: zhangbinzaifendou): Using the Flink1.12.2 test on the idea, the debug log found that the logicalPlan before executing the FlinkLogicalRankRuleForRangeEnd rule is different before apply FlinkLogicalRankRuleForRangeEnd – with case when FlinkLogicalCalc(select=[biz_bill_no, CASE(OR(=(task_mode, 51), AND(=(task_mode, 40), >=(total_stage_num, 2), >=(current_stage_index, 2), =(use_pre_task_owner, 1))), parent_task_no, sowing_task_no) AS parent_task_no_cw, parent_task_no, sowing_task_no, task_type, task_mode, total_stage_num, current_stage_index, use_pre_task_owner], where=[AND(SEARCH(w0$o0, Sarg[1L:BIGINT]:BIGINT), SEARCH(task_type, Sarg[21]), SEARCH(task_mode, Sarg[40, 51]), SEARCH(poi_type, Sarg[2]), SEARCH(biz_origin_bill_type, Sarg[(-∞..111), (111..112), (112..113), (113..114), (114..+∞)]))]) +- FlinkLogicalCalc(select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, w0$o0]) +- FlinkLogicalOverAggregate(window#0=[window(partition \{10, 11} order by [1 DESC-nulls-last] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, wosOutSowingTaskDetail]]) not match FlinkLogicalRankRuleForRangeEnd, But it matches the FlinkCalcMergeRule after FlinkLogicalRankRuleForRangeEnd optimize result: FlinkLogicalCalc(select=[biz_bill_no, CASE(OR(=(task_mode, 51), AND(=(task_mode, 40), >=(total_stage_num, 2), >=(current_stage_index, 2), =(use_pre_task_owner, 1))), parent_task_no, sowing_task_no) AS parent_task_no_cw, parent_task_no, sowing_task_no, task_type, task_mode, total_stage_num, current_stage_index, use_pre_task_owner], where=[AND(SEARCH(w0$o0, Sarg[1L:BIGINT]:BIGINT), SEARCH(task_type, Sarg[21]), SEARCH(task_mode, Sarg[40, 51]), SEARCH(poi_type, Sarg[2]), SEARCH(biz_origin_bill_type, Sarg[(-∞..111), (111..112), (112..113), (113..114), (114..+∞)]))]) +- {color:#ff0000}FlinkLogicalOverAggregate{color}(window#0=[window(partition \{10, 11} order by [1 DESC-nulls-last] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, wosOutSowingTaskDetail]]) – without case when FlinkLogicalCalc(select=[biz_bill_no, parent_task_no, sowing_task_no, task_type, task_mode, total_stage_num, current_stage_index, use_pre_task_owner], where=[AND(SEARCH(w0$o0, Sarg[1L:BIGINT]:BIGINT), SEARCH(task_type, Sarg[21]), SEARCH(task_mode, Sarg[40, 51]), SEARCH(poi_type, Sarg[2]), SEARCH(biz_origin_bill_type, Sarg[(-∞..111), (111..112), (112..113), (113..114), (114..+∞)]))]) +- FlinkLogicalOverAggregate(window#0=[window(partition \{10, 11} order by [1 DESC-nulls-last] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])]) +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, wosOutSowingTaskDetail]]) match FlinkLogicalRankRuleForRangeEnd optimize result: FlinkLogicalCalc(select=[biz_bill_no, parent_task_no, sowing_task_no, task_type, task_mode, total_stage_num, current_stage_index, use_pre_task_owner], where=[AND(AND(AND(AND(AND(AND(=(task_type, 21), OR(=(task_mode, 40), =(task_mode, 51))), =(poi_type, 2)), <>(biz_origin_bill_type, 111)), <>(biz_origin_bill_type, 112)), <>(biz_origin_bill_type, 113)), <>(biz_origin_bill_type, 114))]) +- {color:#ff0000}FlinkLogicalRank{color}(rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt,sowing_task_detail_id], orderBy=[task_type DESC], select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id]) +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, wosOutSowingTaskDetail]]) SQL with case when is converted to FlinkLogicalOverAggregate, SQL without case when is converted to FlinkLogicalRank The problem was solved when I increased FlinkCalcMergeRule.INSTANCE before FlinkLogicalRankRule.INSTANCE in LOGICAL_REWRITE ruleset of FlinkStreamRuleSets.java !image-2022-02-10-20-13-14-424.png|width=645,height=384! cc [Leonard Xu|https://issues.apache.org/jira/secure/ViewProfile.jspa?name=leonard],[~godfreyhe],[~jark] > one sql has row_number =1 and the subsequent SQL has "case when" and "where" > statement result Exception : The window can only be ordered in ASCENDING mode > ---------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-26051 > URL: https://issues.apache.org/jira/browse/FLINK-26051 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.12.2 > Reporter: chuncheng wu > Priority: Major > Attachments: image-2022-02-10-20-13-14-424.png > > > hello, > i have 2 sqls. One sql (sql0) is "select xx from ( ROW_NUMBER stament) > where rn=1" and the other one (sql1) is "s{color:#505f79}elect ${fields} > from result where ${filter_conditions}{color}" . The fields quoted in sql1 > has one "case when" field .The two sql can work well seperately.but if they > combine it results the exception as follow . It happen in the occasion when > logical plan turn into physical plan : > > {code:java} > org.apache.flink.table.api.TableException: The window can only be ordered in > ASCENDING mode. > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630) > at > org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582) > at > com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60){code} > In the stacktrace above , rownumber() 's physical rel which is > StreamExecRank In nomal change to StreamExecOverAggregate . The > StreamExecOverAggregate rel has a window= ROWS BETWEEN UNBOUNDED PRECEDING > AND CURRENT ROW which i never add .Oddly,if you remove the "case when" field > or the "where" statement in sql1 ,the program will work well, the exception > disappear. In the same time, rownumber() 's physical rel change back to > StreamExecRank. Its confusing me a lot > > example code : > {code:java} > import org.apache.flink.api.java.tuple.Tuple12; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.source.SourceFunction; > import org.apache.flink.table.api.EnvironmentSettings; > import org.apache.flink.table.api.ExplainDetail; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.types.Row; > import org.junit.jupiter.api.Test; > import java.sql.Timestamp; > import static org.apache.flink.table.api.Expressions.$; > public class BugTest { > @Test public void testRowNumber() throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings > mySetting=EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, > mySetting); > env.setParallelism(1); > //source table ,name is wosOutSowingTaskDetail > DataStream<Tuple12<String, Integer, Integer, String, Integer, Integer, > Integer, Integer, Integer, String, String, String>> oriStream = > env.addSource(new CustomSourceRowNumber()); > Table testTable = tableEnv.fromDataStream(oriStream, $("biz_bill_no"), > $("task_type"), $("task_mode"), $("parent_task_no"), $("total_stage_num"), > $("current_stage_index"), $("use_pre_task_owner"), $("poi_type"), > $("biz_origin_bill_type"), $("sowing_task_no"), $("dt"), > $("sowing_task_detail_id")); > tableEnv.createTemporaryView("wosOutSowingTaskDetail", testTable); > //sql 0,select xx from ( ROW_NUMBER stament) where rn=1 > Table wosOutSowingTaskDetailLatest = tableEnv.sqlQuery( "SELECT > `biz_bill_no`\n" > + ",task_type\n" > + ",task_mode\n" > + ",parent_task_no\n" > + " FROM (\n" > + " SELECT *,\n" > + " ROW_NUMBER() OVER(PARTITION BY sowing_task_detail_id ORDER BY > task_type desc) AS rn\n" > + " FROM wosOutSowingTaskDetail\n" > + " ) tmp\n" > + " WHERE rn = 1"); > System.out.println("SQL 0 Plan: "); > > System.out.println(wosOutSowingTaskDetailLatest.explain(ExplainDetail.ESTIMATED_COST)); > > > System.out.println(wosOutSowingTaskDetailLatest.explain(ExplainDetail.CHANGELOG_MODE)); > > tableEnv.createTemporaryView("wosOutSowingTaskDetailLatest", > wosOutSowingTaskDetailLatest); > > //sql 1:select from sql0'result table ,field has a CASE WHEN field > Table resultTable = tableEnv.sqlQuery("SELECT\n" > + "biz_bill_no,\n" > + "CASE WHEN task_type = 21 THEN parent_task_no\n" > + " ELSE null END AS parent_task_no_cw\n" > + "FROM wosOutSowingTaskDetailLatest\n" > + "WHERE task_type = 21\n" > + "AND task_mode IN (51, 40)\n"); > // this sql remove CASE WHEN field can work well: > // Table resultTable = tableEnv.sqlQuery("SELECT\n" > // + "biz_bill_no\n" > // + "FROM wosOutSowingTaskDetailLatest\n" > // + "WHERE task_type = 21\n" > // + "AND task_mode IN (51, 40)\n"); > System.out.println("SQL 1 Plan: "); > System.out.println(resultTable.explain(ExplainDetail.ESTIMATED_COST)); > System.out.println(resultTable.explain(ExplainDetail.CHANGELOG_MODE)); > DataStream<Tuple2<Boolean, Row>> resultStream = > tableEnv.toRetractStream(resultTable, Row.class); > env.execute(); > } > //user define source ,just for test,no use here > static class CustomSourceRowNumber implements SourceFunction<Tuple12<String, > Integer, Integer, String, Integer, Integer, Integer, Integer, Integer, > String, String, String>> { > private boolean isRuning = true; > @Override public void run(SourceContext<Tuple12<String, Integer, Integer, > String, Integer, Integer, Integer, Integer, Integer, String, String, String>> > sourceContext) throws Exception { > while (isRuning) { > > sourceContext.collect(Tuple12.of("xxx",21,51,"yyy",1,1,0,2,110,"zzz","aaa","bbb")); > > > sourceContext.collect(Tuple12.of("xxx",21,40,"yyy",2,2,1,2,110,"zzz","aaa","bbb")); > > Thread.sleep(Integer.MAX_VALUE); } } @Override public void cancel() > { isRuning = false; > } > } > } > {code} > System.out.println > {code:java} > SQL 0 Plan: > == Abstract Syntax Tree == > LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], > parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], > use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], > sowing_task_no=[$9]) > +- LogicalFilter(condition=[=($12, 1)]) > +- LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], > parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], > use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], > sowing_task_no=[$9], dt=[$10], sowing_task_detail_id=[$11], rn=[ROW_NUMBER() > OVER (PARTITION BY $10, $11 ORDER BY $1 DESC NULLS LAST)]) > +- LogicalTableScan(table=[[default_catalog, default_database, > wosOutSowingTaskDetail]]) > == Optimized Logical Plan == > Calc(select=[biz_bill_no, task_type, task_mode, parent_task_no, > total_stage_num, current_stage_index, use_pre_task_owner, poi_type, > biz_origin_bill_type, sowing_task_no], changelogMode=[I,UA,D]) > +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, sowing_task_detail_id], > orderBy=[task_type DESC], select=[biz_bill_no, task_type, task_mode, > parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, > poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], > changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[dt, sowing_task_detail_id]], > changelogMode=[I]) > +- DataStreamScan(table=[[default_catalog, default_database, > wosOutSowingTaskDetail]], fields=[biz_bill_no, task_type, task_mode, > parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, > poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], > changelogMode=[I]) > == Physical Execution Plan == > Stage 1 : Data Source > content : Source: Custom Source > Stage 7 : Operator > content : > SourceConversion(table=[default_catalog.default_database.wosOutSowingTaskDetail], > fields=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, > current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, > sowing_task_no, dt, sowing_task_detail_id]) > ship_strategy : FORWARD > Stage 9 : Operator > content : Rank(strategy=[AppendFastStrategy], > rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, > sowing_task_detail_id], orderBy=[task_type DESC], select=[biz_bill_no, > task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, > use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, > sowing_task_detail_id]) > ship_strategy : HASH > Stage 10 : Operator > content : Calc(select=[biz_bill_no, task_type, task_mode, > parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, > poi_type, biz_origin_bill_type, sowing_task_no]) > ship_strategy : FORWARD > SQL 1 Plan: > org.apache.flink.table.api.TableException: The window can only be ordered in > ASCENDING mode. {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)