P Rohan Kumar created FLINK-31165:
-------------------------------------
Summary: Over Agg: The window rank function without order by error
in top N query
Key: FLINK-31165
URL: https://issues.apache.org/jira/browse/FLINK-31165
Project: Flink
Issue Type: Bug
Components: Table SQL / API
Affects Versions: 1.16.0
Reporter: P Rohan Kumar
{code:java}
val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
val td = TableDescriptor.forConnector("datagen").option("rows-per-second", "10")
.option("number-of-rows", "10")
.schema(Schema
.newBuilder()
.column("NAME", DataTypes.VARCHAR(2147483647))
.column("ROLLNO", DataTypes.DECIMAL(5, 0))
.column("DOB", DataTypes.DATE())
.column("CLASS", DataTypes.DECIMAL(2, 0))
.column("SUBJECT", DataTypes.VARCHAR(2147483647))
.build())
.build()
val table = tableEnv.from(td)
tableEnv.createTemporaryView("temp_table", table)
val newTable = tableEnv.sqlQuery("select temp_table.*,cast('2022-01-01' as
date) SRC_NO from temp_table")
tableEnv.createTemporaryView("temp_table2", newTable)
val newTable2 = tableEnv.sqlQuery("select * from (select
NAME,ROLLNO,row_number() over (partition by NAME ORDER BY SRC_NO) AS rownum
from temp_table2 a) where rownum <= 1")
tableEnv.toChangelogStream(newTable2).print()
env.execute()
{code}
I am getting the below error if I run the above code.
I have already provided an order by column.
If I change the order by column to some other column, such as "SUBJECT", then
the job runs fine.
{code:java}
Exception in thread "main" java.lang.RuntimeException: Error while applying
rule FlinkLogicalOverAggregateConverter(in:NONE,out:LOGICAL), args
[rel#245:LogicalWindow.NONE.any.None:
0.[NONE].[NONE](input=RelSubset#244,window#0=window(partition {0} rows between
UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()]))]
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:256)
at
org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
at
scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:187)
at
scala.collection.TraversableOnce$folder$1$.apply(TraversableOnce.scala:185)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:189)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:184)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
at
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
at
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
at
org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.scala:160)
at org.example.OverAggregateBug$.main(OverAggregateBug.scala:39)
at org.example.OverAggregateBug.main(OverAggregateBug.scala)
Caused by: org.apache.flink.table.api.ValidationException: Over Agg: The window
rank function without order by. please re-check the over window statement.
at
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$2(FlinkLogicalOverAggregate.scala:95)
at
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$2$adapted(FlinkLogicalOverAggregate.scala:92)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$1(FlinkLogicalOverAggregate.scala:92)
at
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.$anonfun$convert$1$adapted(FlinkLogicalOverAggregate.scala:89)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregateConverter.convert(FlinkLogicalOverAggregate.scala:89)
at
org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:167)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229)
... 27 more {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)