[ https://issues.apache.org/jira/browse/FLINK-26051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
chuncheng wu updated FLINK-26051: --------------------------------- Description: hello, i have 2 sqls. One sql (sql0) is "select xx from ( ROW_NUMBER stament) where rn=1" and the other one (sql1) is "s{color:#505f79}elect ${fields} from result where ${filter_conditions}{color}" . The fields quoted in sql1 has one "case when" field .The two sql can work well seperately.but if they combine it results the exception as follow . It happen in the occasion when logical plan turn into physical plan : {code:java} org.apache.flink.table.api.TableException: The window can only be ordered in ASCENDING mode. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65) at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103) at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42) at org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630) at org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582) at com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60){code} In the stacktrace above , rownumber() 's physical rel which is StreamExecRank In nomal change to StreamExecOverAggregate . The StreamExecOverAggregate rel has a window= ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW which i never add .Oddly,if you remove the "case when" field or the "where" statement in sql1 ,the program will work well, the exception disappear. In the same time, rownumber() 's physical rel change back to StreamExecRank. Its confusing me a lot example code : {code:java} @Test public void testRowNumber() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings mySetting = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); env.setParallelism(1); Configuration configuration = tableEnv.getConfig().getConfiguration(); DataStream<Tuple12<String, Integer, Integer, String, Integer, Integer, Integer, Integer, Integer, String, String, String>> oriStream = env.addSource(new CustomSourceRowNumber()); Table testTable = tableEnv.fromDataStream( oriStream, $("biz_bill_no"), $("task_type"), $("task_mode"), $("parent_task_no"), $("total_stage_num"), $("current_stage_index"), $("use_pre_task_owner"), $("poi_type"), $("biz_origin_bill_type"), $("sowing_task_no"), $("dt"), $("sowing_task_detail_id")); tableEnv.createTemporaryView("wosOutSowingTaskDetail", testTable); //sql0 Table wosOutSowingTaskDetailLatest = tableEnv.sqlQuery( "SELECT `biz_bill_no`\n" + ",task_type\n" + ",task_mode\n" + ",parent_task_no\n" + ",total_stage_num\n" + ",current_stage_index\n" + ",use_pre_task_owner\n" + ",poi_type\n" + ",biz_origin_bill_type\n" + ",sowing_task_no\n" + " FROM (\n" + " SELECT *,\n" + " ROW_NUMBER() OVER(PARTITION BY dt,sowing_task_detail_id ORDER BY task_type desc) AS rn\n" + " FROM wosOutSowingTaskDetail\n" + " ) tmp\n" + " WHERE rn = 1"); tableEnv.createTemporaryView("wosOutSowingTaskDetailLatest", wosOutSowingTaskDetailLatest); //sql1 ,if we remove "CASE WHEN task_mode = 51 THEN parent_task_no\n" + " WHEN //task_mode = 40 AND total_stage_num >= 2 AND current_stage_index >= 2 AND use_pre_task_owner = 1 THEN //parent_task_no\n" + " ELSE sowing_task_no END AS parent_task_no_cw\n", the program will work well Table resultTable = tableEnv.sqlQuery("SELECT\n" + "biz_bill_no\n" + ", CASE WHEN task_mode = 51 THEN parent_task_no\n" + " WHEN task_mode = 40 AND total_stage_num >= 2 AND current_stage_index >= 2 AND use_pre_task_owner = 1 THEN parent_task_no\n" + " ELSE sowing_task_no END AS parent_task_no_cw\n" + ",parent_task_no" + ",sowing_task_no, " + "task_type, task_mode, " + "total_stage_num, " + "current_stage_index," + "use_pre_task_owner \n" + "FROM wosOutSowingTaskDetailLatest\n" + "WHERE task_type = 21\n" + "AND task_mode IN (51, 40)\n" + "AND poi_type = 2\n" + "AND biz_origin_bill_type not in (111,112,113,114)"); System.out.println(resultTable.explain()); } class CustomSourceRowNumber implements SourceFunction<Tuple12<String, Integer, Integer, String, Integer, Integer, Integer, Integer, Integer, String, String, String>> { private boolean isRuning = true; @Override public void run( SourceContext<Tuple12<String, Integer, Integer, String, Integer, Integer, Integer, Integer, Integer, String, String, String>> sourceContext) throws Exception { while (isRuning) { sourceContext.collect(Tuple12.of("xxx", 21, 51, "yyy", 1, 1, 0, 2, 110, "zzz", "aaa", "bbb")); sourceContext.collect(Tuple12.of("xxx", 21, 40, "yyy", 2, 2, 1, 2, 110, "zzz", "aaa", "bbb")); Thread.sleep(Integer.MAX_VALUE); } } @Override public void cancel() { isRuning = false; } } {code} System.out.println {code:java} SQL 0 Plan: == Abstract Syntax Tree == LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], sowing_task_no=[$9]) +- LogicalFilter(condition=[=($12, 1)]) +- LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], sowing_task_no=[$9], dt=[$10], sowing_task_detail_id=[$11], rn=[ROW_NUMBER() OVER (PARTITION BY $10, $11 ORDER BY $1 DESC NULLS LAST)]) +- LogicalTableScan(table=[[default_catalog, default_database, wosOutSowingTaskDetail]]) == Optimized Logical Plan == Calc(select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no], changelogMode=[I,UA,D]) +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, sowing_task_detail_id], orderBy=[task_type DESC], select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], changelogMode=[I,UA,D]) +- Exchange(distribution=[hash[dt, sowing_task_detail_id]], changelogMode=[I]) +- DataStreamScan(table=[[default_catalog, default_database, wosOutSowingTaskDetail]], fields=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], changelogMode=[I]) == Physical Execution Plan == Stage 1 : Data Source content : Source: Custom Source Stage 7 : Operator content : SourceConversion(table=[default_catalog.default_database.wosOutSowingTaskDetail], fields=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id]) ship_strategy : FORWARD Stage 9 : Operator content : Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, sowing_task_detail_id], orderBy=[task_type DESC], select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id]) ship_strategy : HASH Stage 10 : Operator content : Calc(select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no]) ship_strategy : FORWARD SQL 1 Plan: org.apache.flink.table.api.TableException: The window can only be ordered in ASCENDING mode. {code} was: hello, i have 2 sqls. One sql (sql0) is "select xx from ( ROW_NUMBER stament) where rn=1" and the other one (sql1) is "s{color:#505f79}elect ${fields} from result where ${filter_conditions}{color}" . The fields quoted in sql1 has one "case when" field .The two sql can work well seperately.but if they combine it results the exception as follow . It happen in the occasion when logical plan turn into physical plan : {code:java} org.apache.flink.table.api.TableException: The window can only be ordered in ASCENDING mode. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65) at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103) at org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42) at org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630) at org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582) at com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60){code} In the stacktrace above , rownumber() 's physical rel which is StreamExecRank In nomal change to StreamExecOverAggregate . The StreamExecOverAggregate rel has a window= ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW which i never add .Oddly,if you remove the "case when" field or the "where" statement in sql1 ,the program will work well, the exception disappear. In the same time, rownumber() 's physical rel change back to StreamExecRank. Its confusing me a lot example code : {code:java} import org.apache.flink.api.java.tuple.Tuple12; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.ExplainDetail; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import org.junit.jupiter.api.Test; import java.sql.Timestamp; import static org.apache.flink.table.api.Expressions.$; public class BugTest { @Test public void testRowNumber() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings mySetting=EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, mySetting); env.setParallelism(1); //source table ,name is wosOutSowingTaskDetail DataStream<Tuple12<String, Integer, Integer, String, Integer, Integer, Integer, Integer, Integer, String, String, String>> oriStream = env.addSource(new CustomSourceRowNumber()); Table testTable = tableEnv.fromDataStream(oriStream, $("biz_bill_no"), $("task_type"), $("task_mode"), $("parent_task_no"), $("total_stage_num"), $("current_stage_index"), $("use_pre_task_owner"), $("poi_type"), $("biz_origin_bill_type"), $("sowing_task_no"), $("dt"), $("sowing_task_detail_id")); tableEnv.createTemporaryView("wosOutSowingTaskDetail", testTable); //sql 0,select xx from ( ROW_NUMBER stament) where rn=1 Table wosOutSowingTaskDetailLatest = tableEnv.sqlQuery( "SELECT `biz_bill_no`\n" + ",task_type\n" + ",task_mode\n" + ",parent_task_no\n" + ",total_stage_num\n" + ",current_stage_index\n" + ",use_pre_task_owner\n" + ",poi_type\n" + ",biz_origin_bill_type\n" + ",sowing_task_no\n" + " FROM (\n" + " SELECT *,\n" + " ROW_NUMBER() OVER(PARTITION BY dt,sowing_task_detail_id ORDER BY task_type desc) AS rn\n" + " FROM wosOutSowingTaskDetail\n" + " ) tmp\n" + " WHERE rn = 1"); System.out.println("SQL 0 Plan: "); System.out.println(wosOutSowingTaskDetailLatest.explain(ExplainDetail.ESTIMATED_COST)); System.out.println(wosOutSowingTaskDetailLatest.explain(ExplainDetail.CHANGELOG_MODE)); tableEnv.createTemporaryView("wosOutSowingTaskDetailLatest", wosOutSowingTaskDetailLatest); //sql 1:select from sql0'result table ,field has a CASE WHEN field Table resultTable = tableEnv.sqlQuery("SELECT\n" + "biz_bill_no\n" + ", CASE WHEN task_mode = 51 THEN parent_task_no\n" + " WHEN task_mode = 40 AND total_stage_num >= 2 AND current_stage_index >= 2 AND use_pre_task_owner = 1 THEN parent_task_no\n" + " ELSE sowing_task_no END AS parent_task_no_cw\n" + ",parent_task_no" + ",sowing_task_no, " + "task_type, task_mode, " + "total_stage_num, " + "current_stage_index," + "use_pre_task_owner \n" + "FROM wosOutSowingTaskDetailLatest\n" + "WHERE task_type = 21\n" + "AND task_mode IN (51, 40)\n" + "AND poi_type = 2\n" + "AND biz_origin_bill_type not in (111,112,113,114)"); // this sql remove CASE WHEN field can work System.out.println("SQL 1 Plan: "); System.out.println(resultTable.explain(ExplainDetail.ESTIMATED_COST)); System.out.println(resultTable.explain(ExplainDetail.CHANGELOG_MODE)); DataStream<Tuple2<Boolean, Row>> resultStream = tableEnv.toRetractStream(resultTable, Row.class); env.execute(); } //user define source ,just for test,no use here static class CustomSourceRowNumber implements SourceFunction<Tuple12<String, Integer, Integer, String, Integer, Integer, Integer, Integer, Integer, String, String, String>> { private boolean isRuning = true; @Override public void run(SourceContext<Tuple12<String, Integer, Integer, String, Integer, Integer, Integer, Integer, Integer, String, String, String>> sourceContext) throws Exception { while (isRuning) { sourceContext.collect(Tuple12.of("xxx",21,51,"yyy",1,1,0,2,110,"zzz","aaa","bbb")); sourceContext.collect(Tuple12.of("xxx",21,40,"yyy",2,2,1,2,110,"zzz","aaa","bbb")); Thread.sleep(Integer.MAX_VALUE); } } @Override public void cancel() { isRuning = false; } } } {code} System.out.println {code:java} SQL 0 Plan: == Abstract Syntax Tree == LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], sowing_task_no=[$9]) +- LogicalFilter(condition=[=($12, 1)]) +- LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], sowing_task_no=[$9], dt=[$10], sowing_task_detail_id=[$11], rn=[ROW_NUMBER() OVER (PARTITION BY $10, $11 ORDER BY $1 DESC NULLS LAST)]) +- LogicalTableScan(table=[[default_catalog, default_database, wosOutSowingTaskDetail]]) == Optimized Logical Plan == Calc(select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no], changelogMode=[I,UA,D]) +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, sowing_task_detail_id], orderBy=[task_type DESC], select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], changelogMode=[I,UA,D]) +- Exchange(distribution=[hash[dt, sowing_task_detail_id]], changelogMode=[I]) +- DataStreamScan(table=[[default_catalog, default_database, wosOutSowingTaskDetail]], fields=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], changelogMode=[I]) == Physical Execution Plan == Stage 1 : Data Source content : Source: Custom Source Stage 7 : Operator content : SourceConversion(table=[default_catalog.default_database.wosOutSowingTaskDetail], fields=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id]) ship_strategy : FORWARD Stage 9 : Operator content : Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, sowing_task_detail_id], orderBy=[task_type DESC], select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id]) ship_strategy : HASH Stage 10 : Operator content : Calc(select=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no]) ship_strategy : FORWARD SQL 1 Plan: org.apache.flink.table.api.TableException: The window can only be ordered in ASCENDING mode. {code} > one sql has row_number =1 and the subsequent SQL has "case when" and "where" > statement result Exception : The window can only be ordered in ASCENDING mode > ---------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-26051 > URL: https://issues.apache.org/jira/browse/FLINK-26051 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.12.2 > Reporter: chuncheng wu > Priority: Major > Attachments: image-2022-02-10-20-13-14-424.png > > > hello, > i have 2 sqls. One sql (sql0) is "select xx from ( ROW_NUMBER stament) > where rn=1" and the other one (sql1) is "s{color:#505f79}elect ${fields} > from result where ${filter_conditions}{color}" . The fields quoted in sql1 > has one "case when" field .The two sql can work well seperately.but if they > combine it results the exception as follow . It happen in the occasion when > logical plan turn into physical plan : > > {code:java} > org.apache.flink.table.api.TableException: The window can only be ordered in > ASCENDING mode. > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:98) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregate.translateToPlanInternal(StreamExecOverAggregate.scala:52) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecOverAggregateBase.translateToPlan(StreamExecOverAggregateBase.scala:42) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39) > at > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) > at > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) > at > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:65) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:65) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:103) > at > org.apache.flink.table.planner.delegation.StreamPlanner.explain(StreamPlanner.scala:42) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.explainInternal(TableEnvironmentImpl.java:630) > at > org.apache.flink.table.api.internal.TableImpl.explain(TableImpl.java:582) > at > com.meituan.grocery.data.flink.test.BugTest.testRowNumber(BugTest.java:69) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60){code} > In the stacktrace above , rownumber() 's physical rel which is > StreamExecRank In nomal change to StreamExecOverAggregate . The > StreamExecOverAggregate rel has a window= ROWS BETWEEN UNBOUNDED PRECEDING > AND CURRENT ROW which i never add .Oddly,if you remove the "case when" field > or the "where" statement in sql1 ,the program will work well, the exception > disappear. In the same time, rownumber() 's physical rel change back to > StreamExecRank. Its confusing me a lot > > example code : > {code:java} > @Test > public void testRowNumber() throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings mySetting = EnvironmentSettings > .newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); > env.setParallelism(1); > Configuration configuration = tableEnv.getConfig().getConfiguration(); > DataStream<Tuple12<String, Integer, Integer, String, Integer, > Integer, Integer, Integer, > Integer, String, String, String>> oriStream = > env.addSource(new CustomSourceRowNumber()); > Table testTable = tableEnv.fromDataStream( > oriStream, > $("biz_bill_no"), > $("task_type"), > $("task_mode"), > $("parent_task_no"), > $("total_stage_num"), > $("current_stage_index"), > $("use_pre_task_owner"), > $("poi_type"), > $("biz_origin_bill_type"), > $("sowing_task_no"), > $("dt"), > $("sowing_task_detail_id")); > tableEnv.createTemporaryView("wosOutSowingTaskDetail", testTable); > //sql0 > Table wosOutSowingTaskDetailLatest = tableEnv.sqlQuery( > "SELECT `biz_bill_no`\n" + > ",task_type\n" + > ",task_mode\n" + > ",parent_task_no\n" + > ",total_stage_num\n" + > ",current_stage_index\n" + > ",use_pre_task_owner\n" + > ",poi_type\n" + > ",biz_origin_bill_type\n" + > ",sowing_task_no\n" + > " FROM (\n" + > " SELECT *,\n" + > " ROW_NUMBER() OVER(PARTITION BY > dt,sowing_task_detail_id ORDER BY task_type desc) AS rn\n" > + > " FROM wosOutSowingTaskDetail\n" + > " ) tmp\n" + > " WHERE rn = 1"); > tableEnv.createTemporaryView("wosOutSowingTaskDetailLatest", > wosOutSowingTaskDetailLatest); > //sql1 ,if we remove "CASE WHEN task_mode = 51 THEN > parent_task_no\n" + " WHEN > //task_mode = 40 AND total_stage_num >= 2 AND current_stage_index >= > 2 AND use_pre_task_owner = 1 THEN > //parent_task_no\n" + " ELSE sowing_task_no END > AS parent_task_no_cw\n", the program will work well > Table resultTable = tableEnv.sqlQuery("SELECT\n" + > "biz_bill_no\n" + > ", CASE WHEN task_mode = 51 THEN parent_task_no\n" + > " WHEN task_mode = 40 AND total_stage_num >= 2 AND > current_stage_index >= 2 AND use_pre_task_owner = 1 THEN parent_task_no\n" + > " ELSE sowing_task_no END AS parent_task_no_cw\n" + > ",parent_task_no" > + ",sowing_task_no, " > + "task_type, task_mode, " > + "total_stage_num, " > + "current_stage_index," > + "use_pre_task_owner \n" + > "FROM wosOutSowingTaskDetailLatest\n" + > "WHERE task_type = 21\n" + > "AND task_mode IN (51, 40)\n" + > "AND poi_type = 2\n" + > "AND biz_origin_bill_type not in (111,112,113,114)"); > > System.out.println(resultTable.explain()); > } > class CustomSourceRowNumber implements SourceFunction<Tuple12<String, > Integer, Integer, String, Integer, Integer, > Integer, Integer, > Integer, String, String, String>> { > private boolean isRuning = true; @Override > public void run( > SourceContext<Tuple12<String, Integer, Integer, String, > Integer, Integer, Integer, Integer, > Integer, String, String, String>> sourceContext) > throws Exception { > while (isRuning) { > sourceContext.collect(Tuple12.of("xxx", > 21, > 51, > "yyy", > 1, > 1, > 0, > 2, > 110, > "zzz", > "aaa", > "bbb")); > sourceContext.collect(Tuple12.of("xxx", > 21, > 40, > "yyy", > 2, > 2, > 1, > 2, > 110, > "zzz", > "aaa", > "bbb")); > Thread.sleep(Integer.MAX_VALUE); > } > } @Override > public void cancel() { > isRuning = false; > } > } {code} > System.out.println > {code:java} > SQL 0 Plan: > == Abstract Syntax Tree == > LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], > parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], > use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], > sowing_task_no=[$9]) > +- LogicalFilter(condition=[=($12, 1)]) > +- LogicalProject(biz_bill_no=[$0], task_type=[$1], task_mode=[$2], > parent_task_no=[$3], total_stage_num=[$4], current_stage_index=[$5], > use_pre_task_owner=[$6], poi_type=[$7], biz_origin_bill_type=[$8], > sowing_task_no=[$9], dt=[$10], sowing_task_detail_id=[$11], rn=[ROW_NUMBER() > OVER (PARTITION BY $10, $11 ORDER BY $1 DESC NULLS LAST)]) > +- LogicalTableScan(table=[[default_catalog, default_database, > wosOutSowingTaskDetail]]) > == Optimized Logical Plan == > Calc(select=[biz_bill_no, task_type, task_mode, parent_task_no, > total_stage_num, current_stage_index, use_pre_task_owner, poi_type, > biz_origin_bill_type, sowing_task_no], changelogMode=[I,UA,D]) > +- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], > rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, sowing_task_detail_id], > orderBy=[task_type DESC], select=[biz_bill_no, task_type, task_mode, > parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, > poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], > changelogMode=[I,UA,D]) > +- Exchange(distribution=[hash[dt, sowing_task_detail_id]], > changelogMode=[I]) > +- DataStreamScan(table=[[default_catalog, default_database, > wosOutSowingTaskDetail]], fields=[biz_bill_no, task_type, task_mode, > parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, > poi_type, biz_origin_bill_type, sowing_task_no, dt, sowing_task_detail_id], > changelogMode=[I]) > == Physical Execution Plan == > Stage 1 : Data Source > content : Source: Custom Source > Stage 7 : Operator > content : > SourceConversion(table=[default_catalog.default_database.wosOutSowingTaskDetail], > fields=[biz_bill_no, task_type, task_mode, parent_task_no, total_stage_num, > current_stage_index, use_pre_task_owner, poi_type, biz_origin_bill_type, > sowing_task_no, dt, sowing_task_detail_id]) > ship_strategy : FORWARD > Stage 9 : Operator > content : Rank(strategy=[AppendFastStrategy], > rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=1], partitionBy=[dt, > sowing_task_detail_id], orderBy=[task_type DESC], select=[biz_bill_no, > task_type, task_mode, parent_task_no, total_stage_num, current_stage_index, > use_pre_task_owner, poi_type, biz_origin_bill_type, sowing_task_no, dt, > sowing_task_detail_id]) > ship_strategy : HASH > Stage 10 : Operator > content : Calc(select=[biz_bill_no, task_type, task_mode, > parent_task_no, total_stage_num, current_stage_index, use_pre_task_owner, > poi_type, biz_origin_bill_type, sowing_task_no]) > ship_strategy : FORWARD > SQL 1 Plan: > org.apache.flink.table.api.TableException: The window can only be ordered in > ASCENDING mode. {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)