[ 
https://issues.apache.org/jira/browse/FLINK-31165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703466#comment-17703466
 ] 

Jane Chan edited comment on FLINK-31165 at 3/22/23 4:29 AM:
------------------------------------------------------------

Hi [~rohankrao] , thanks for reporting this issue.

Actually, the order by field SRC_NO is a constant and is folded during query 
rewrite. See 
[LogicalWindow.java#L371|https://github.com/apache/flink/blob/268fc1a46f8af171c7102229a010af71c56623d0/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalWindow.java#L371],
 when applying Calcite's ProjectToWindowRule.
{code:java}
LogicalProject(inputs=[0..2])
+- LogicalFilter(condition=[<=($2, 1)])
   +- LogicalProject(inputs=[0..1], exprs=[[ROW_NUMBER() OVER (PARTITION BY $0 
ORDER BY 2022-01-01 NULLS FIRST)]])
      +- LogicalTableScan(table=[[default_catalog, default_database, 
temp_table]]) {code}
>From the perspective of SQL semantics, using a constant as the order by field 
>for row_number has no meaning, because the constant will not change the 
>sorting result of row_number, and each row will get the same rank. As a 
>current workaround, please try to specify another field to ensure that 
>row_number is sorted in the correct order.

While I tested the query against MySQL, PostgreSQL, Spark, and Hive, they do 
allow this to happen, and just output the first inserted row. Do you think we 
need to align this behavior? Or at least throwing a more meaningful error. cc 
[~godfreyhe]  [~lincoln.86xy] 

 

 


was (Author: qingyue):
Hi [~rohankrao] , thanks for reporting this issue.

Actually, the order by field SRC_NO is a constant and is folded during query 
rewrite. See 
[LogicalWindow.java#L371|https://github.com/apache/flink/blob/268fc1a46f8af171c7102229a010af71c56623d0/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/logical/LogicalWindow.java#L371],
 when applying Calcite's ProjectToWindowRule.
{code:java}
LogicalProject(inputs=[0..2])
+- LogicalFilter(condition=[<=($2, 1)])
   +- LogicalProject(inputs=[0..1], exprs=[[ROW_NUMBER() OVER (PARTITION BY $0 
ORDER BY 2022-01-01 NULLS FIRST)]])
      +- LogicalTableScan(table=[[default_catalog, default_database, 
temp_table]]) {code}
>From the perspective of SQL semantics, using a constant as the order by field 
>for row_number has no meaning, because the constant will not change the 
>sorting result of row_number, and each row will get the same rank. As a 
>current workaround, please try to specify another field to ensure that 
>row_number is sorted in the correct order.

While I tested the query against MySQL, PostgreSQL, Spark, and Hive, they do 
allow this to happen, and just output the first inserted row. Do you think we 
need to align this behavior? cc [~godfreyhe]  [~lincoln.86xy] 

 

 

> Over Agg: The window rank function without order by error in top N query
> ------------------------------------------------------------------------
>
>                 Key: FLINK-31165
>                 URL: https://issues.apache.org/jira/browse/FLINK-31165
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.16.0
>            Reporter: P Rohan Kumar
>            Priority: Major
>
>  
> {code:java}
> val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
> val tableEnv = StreamTableEnvironment.create(env)
> val td = TableDescriptor.forConnector("datagen").option("rows-per-second", 
> "10")
>   .option("number-of-rows", "10")
>   .schema(Schema
>     .newBuilder()
>     .column("NAME", DataTypes.VARCHAR(2147483647))
>     .column("ROLLNO", DataTypes.DECIMAL(5, 0))
>     .column("DOB", DataTypes.DATE())
>     .column("CLASS", DataTypes.DECIMAL(2, 0))
>     .column("SUBJECT", DataTypes.VARCHAR(2147483647))
>     .build())
>   .build()
> val table = tableEnv.from(td)
> tableEnv.createTemporaryView("temp_table", table)
> val newTable = tableEnv.sqlQuery("select temp_table.*,cast('2022-01-01' as 
> date) SRC_NO from temp_table")
> tableEnv.createTemporaryView("temp_table2", newTable)
> val newTable2 = tableEnv.sqlQuery("select * from (select 
> NAME,ROLLNO,row_number() over (partition by NAME ORDER BY SRC_NO) AS rownum  
> from temp_table2 a) where rownum <= 1")
> tableEnv.toChangelogStream(newTable2).print()
> env.execute()
>  {code}
>  
>  
> I am getting the below error if I run the above code.
> I have already provided an order by column.
> If I change the order by column to some other column, such as "SUBJECT", then 
> the job runs fine.
>  
>  
> {code:java}
> Exception in thread "main" java.lang.RuntimeException: Error while applying 
> rule FlinkLogicalOverAggregateConverter(in:NONE,out:LOGICAL), args 
> [rel#245:LogicalWindow.NONE.any.None: 
> 0.[NONE].[NONE](input=RelSubset#244,window#0=window(partition {0} rows 
> between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()]))]
>     at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256)
>     at 
> org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
>     at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
>     at 
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
>     at 
> scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)
>     at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
>     at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
>     at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
>     at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
>     at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
>     at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
>     at 
> org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
>     at 
> org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
>     at 
> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.scala:160)
>     at org.example.OverAggregateBug$.main(OverAggregateBug.scala:39)
>     at org.example.OverAggregateBug.main(OverAggregateBug.scala)
> Caused by: org.apache.flink.table.api.ValidationException: Over Agg: The 
> window rank function without order by. please re-check the over window 
> statement.
>     at 
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$2(FlinkLogicalOverAggregate.scala:95)
>     at 
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$2$adapted(FlinkLogicalOverAggregate.scala:92)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)
>     at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>     at 
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$1(FlinkLogicalOverAggregate.scala:92)
>     at 
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$1$adapted(FlinkLogicalOverAggregate.scala:89)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)
>     at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>     at 
> org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.convert(FlinkLogicalOverAggregate.scala:89)
>     at 
> org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:167)
>     at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229)
>     ... 27 more {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to