[ 
https://issues.apache.org/jira/browse/FLINK-39715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18082369#comment-18082369
 ] 

Yaoxuan Wu commented on FLINK-39715:
------------------------------------

Hi, [~arvindk12] I’m not actively working on a patch for this ticket. Please 
feel free to take it over. Thanks!

> [Table/Planner] IndexOutOfBoundsException in FlinkExpandConversionRule for 
> ORDER BY followed by global aggregate in batch mode
> ------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39715
>                 URL: https://issues.apache.org/jira/browse/FLINK-39715
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 2.0.1, 2.2.1
>         Environment: - Batch mode
> - PyFlink Table API
> - parallelism.default = 1
>            Reporter: Yaoxuan Wu
>            Priority: Major
>
> In Flink batch mode, executing a global aggregate, such as MAX, MIN, COUNT, 
> or AVG, on a table that was previously sorted with ORDER BY can cause the 
> query planner to crash with an IndexOutOfBoundsException inside 
> FlinkExpandConversionRule.
>  
> Minimal reproducer:
> {code:java}
> from pyflink.table import TableEnvironment, EnvironmentSettings
> from pyflink.table import expressions as T
> from pyflink.table.types import DataTypes
> t_env = TableEnvironment.create(EnvironmentSettings.in_batch_mode())
> t_env.get_config().set('parallelism.default', '1')
> src = t_env.from_elements(
>     [[1, 'x']],
>     DataTypes.ROW([
>         DataTypes.FIELD('a', DataTypes.INT()),
>         DataTypes.FIELD('b', DataTypes.STRING()),
>     ])
> )
> # ORDER BY b, then global MAX(a).
> # The sort key refers to input field index 1, while the aggregate output has 
> width 1.
> src.order_by(T.col('b').asc).select(T.col('a').max).execute().collect() {code}
> Expected behavior:
> The query should execute successfully and return:
> {code:java}
> [Row(1)] {code}
>  
> Actual behavior:
> The planner crashes with an IndexOutOfBoundsException while applying 
> FlinkExpandConversionRule.
>  
> Relevant stack trace excerpt:
> {code:java}
> java.lang.RuntimeException: Error while applying rule 
> FlinkExpandConversionRule, 
>   args [rel#266:AbstractConverter.BATCH_PHYSICAL.single.[1](...), 
>         
> rel#263:BatchPhysicalLocalHashAggregate.BATCH_PHYSICAL.any.[1](...,select=Partial_MAX(a)
>  AS max$0)] 
>     at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:250)
>  
>     at 
> org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:59)
>  
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:524)
>  
>     ... 
>     at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(...)
>  
>     ... 
> Caused by: java.lang.RuntimeException: Error occurred while applying rule 
> FlinkExpandConversionRule 
>     at 
> org.apache.flink.table.planner.plan.rules.physical.FlinkExpandConversionRule.satisfyTraitsBySelf(FlinkExpandConversionRule.scala:72)
>  
>     at 
> org.apache.flink.table.planner.plan.rules.physical.FlinkExpandConversionRule.onMatch(FlinkExpandConversionRule.scala:52)
>     ...
> Caused by: java.lang.IndexOutOfBoundsException: index (1) must be less than 
> size (1)
>     at 
> com.google.common.base.Preconditions.checkElementIndex(Preconditions.java:1372)
>     at 
> com.google.common.collect.SingletonImmutableList.get(SingletonImmutableList.java:47)
>     at org.apache.calcite.util.Util$TransformingList.get(Util.java:2804)
>     at 
> org.apache.flink.table.planner.plan.utils.RelExplainUtil$.collationToString(RelExplainUtil.scala:85)
>     at 
> org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSort.explainTerms(BatchPhysicalSort.scala:61)
>     at 
> org.apache.calcite.rel.AbstractRelNode.deepHashCode(AbstractRelNode.java:398)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.registerImpl(VolcanoPlanner.java:1292)
>     ...{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to