[ 
https://issues.apache.org/jira/browse/FLINK-28764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-28764:
-------------------------------

    Assignee: Wei Zhong

> Support more than 64 distinct aggregate function calls in one aggregate SQL 
> query
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-28764
>                 URL: https://issues.apache.org/jira/browse/FLINK-28764
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.13.6, 1.14.5, 1.15.1
>            Reporter: Wei Zhong
>            Assignee: Wei Zhong
>            Priority: Major
>             Fix For: 1.16.0
>
>
> Currently Flink SQL does not support more than 64 distinct aggregate function 
> calls in one aggregate SQL query. We encountered this problem while migrating 
> batch jobs from spark to flink. The spark job has 79 distinct aggregate 
> function calls in one aggregate SQL query.
> Reproduce code:
> {code:java}
> public class Test64Distinct {
>     public static void main(String[] args) {
>         TableEnvironment tableEnv = 
> TableEnvironment.create(EnvironmentSettings.inBatchMode());
>         tableEnv.executeSql("create table datagen_source(id BIGINT, val 
> BIGINT) with " +
>                 "('connector'='datagen', 'number-of-rows'='1000')");
>         tableEnv.executeSql("select " +
>                 "count(distinct val * 1), " +
>                 "count(distinct val * 2), " +
>                 "count(distinct val * 3), " +
>                 "count(distinct val * 4), " +
>                 "count(distinct val * 5), " +
>                 "count(distinct val * 6), " +
>                 "count(distinct val * 7), " +
>                 "count(distinct val * 8), " +
>                 "count(distinct val * 9), " +
>                 "count(distinct val * 10), " +
>                 "count(distinct val * 11), " +
>                 "count(distinct val * 12), " +
>                 "count(distinct val * 13), " +
>                 "count(distinct val * 14), " +
>                 "count(distinct val * 15), " +
>                 "count(distinct val * 16), " +
>                 "count(distinct val * 17), " +
>                 "count(distinct val * 18), " +
>                 "count(distinct val * 19), " +
>                 "count(distinct val * 20), " +
>                 "count(distinct val * 21), " +
>                 "count(distinct val * 22), " +
>                 "count(distinct val * 23), " +
>                 "count(distinct val * 24), " +
>                 "count(distinct val * 25), " +
>                 "count(distinct val * 26), " +
>                 "count(distinct val * 27), " +
>                 "count(distinct val * 28), " +
>                 "count(distinct val * 29), " +
>                 "count(distinct val * 30), " +
>                 "count(distinct val * 31), " +
>                 "count(distinct val * 32), " +
>                 "count(distinct val * 33), " +
>                 "count(distinct val * 34), " +
>                 "count(distinct val * 35), " +
>                 "count(distinct val * 36), " +
>                 "count(distinct val * 37), " +
>                 "count(distinct val * 38), " +
>                 "count(distinct val * 39), " +
>                 "count(distinct val * 40), " +
>                 "count(distinct val * 41), " +
>                 "count(distinct val * 42), " +
>                 "count(distinct val * 43), " +
>                 "count(distinct val * 44), " +
>                 "count(distinct val * 45), " +
>                 "count(distinct val * 46), " +
>                 "count(distinct val * 47), " +
>                 "count(distinct val * 48), " +
>                 "count(distinct val * 49), " +
>                 "count(distinct val * 50), " +
>                 "count(distinct val * 51), " +
>                 "count(distinct val * 52), " +
>                 "count(distinct val * 53), " +
>                 "count(distinct val * 54), " +
>                 "count(distinct val * 55), " +
>                 "count(distinct val * 56), " +
>                 "count(distinct val * 57), " +
>                 "count(distinct val * 58), " +
>                 "count(distinct val * 59), " +
>                 "count(distinct val * 60), " +
>                 "count(distinct val * 61), " +
>                 "count(distinct val * 62), " +
>                 "count(distinct val * 63), " +
>                 "count(distinct val * 64), " +
>                 "count(distinct val * 65) from datagen_source").print();
>     }
> } {code}
> Exception:
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.TableException: Sql 
> optimization: Cannot generate a valid execution plan for the given query: 
> LogicalSink(table=[*anonymous_collect$1*], fields=[EXPR$0, EXPR$1, EXPR$2, 
> EXPR$3, EXPR$4, EXPR$5, EXPR$6, EXPR$7, EXPR$8, EXPR$9, EXPR$10, EXPR$11, 
> EXPR$12, EXPR$13, EXPR$14, EXPR$15, EXPR$16, EXPR$17, EXPR$18, EXPR$19, 
> EXPR$20, EXPR$21, EXPR$22, EXPR$23, EXPR$24, EXPR$25, EXPR$26, EXPR$27, 
> EXPR$28, EXPR$29, EXPR$30, EXPR$31, EXPR$32, EXPR$33, EXPR$34, EXPR$35, 
> EXPR$36, EXPR$37, EXPR$38, EXPR$39, EXPR$40, EXPR$41, EXPR$42, EXPR$43, 
> EXPR$44, EXPR$45, EXPR$46, EXPR$47, EXPR$48, EXPR$49, EXPR$50, EXPR$51, 
> EXPR$52, EXPR$53, EXPR$54, EXPR$55, EXPR$56, EXPR$57, EXPR$58, EXPR$59, 
> EXPR$60, EXPR$61, EXPR$62, EXPR$63, EXPR$64])
> +- LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], 
> EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[COUNT(DISTINCT $2)], 
> EXPR$3=[COUNT(DISTINCT $3)], EXPR$4=[COUNT(DISTINCT $4)], 
> EXPR$5=[COUNT(DISTINCT $5)], EXPR$6=[COUNT(DISTINCT $6)], 
> EXPR$7=[COUNT(DISTINCT $7)], EXPR$8=[COUNT(DISTINCT $8)], 
> EXPR$9=[COUNT(DISTINCT $9)], EXPR$10=[COUNT(DISTINCT $10)], 
> EXPR$11=[COUNT(DISTINCT $11)], EXPR$12=[COUNT(DISTINCT $12)], 
> EXPR$13=[COUNT(DISTINCT $13)], EXPR$14=[COUNT(DISTINCT $14)], 
> EXPR$15=[COUNT(DISTINCT $15)], EXPR$16=[COUNT(DISTINCT $16)], 
> EXPR$17=[COUNT(DISTINCT $17)], EXPR$18=[COUNT(DISTINCT $18)], 
> EXPR$19=[COUNT(DISTINCT $19)], EXPR$20=[COUNT(DISTINCT $20)], 
> EXPR$21=[COUNT(DISTINCT $21)], EXPR$22=[COUNT(DISTINCT $22)], 
> EXPR$23=[COUNT(DISTINCT $23)], EXPR$24=[COUNT(DISTINCT $24)], 
> EXPR$25=[COUNT(DISTINCT $25)], EXPR$26=[COUNT(DISTINCT $26)], 
> EXPR$27=[COUNT(DISTINCT $27)], EXPR$28=[COUNT(DISTINCT $28)], 
> EXPR$29=[COUNT(DISTINCT $29)], EXPR$30=[COUNT(DISTINCT $30)], 
> EXPR$31=[COUNT(DISTINCT $31)], EXPR$32=[COUNT(DISTINCT $32)], 
> EXPR$33=[COUNT(DISTINCT $33)], EXPR$34=[COUNT(DISTINCT $34)], 
> EXPR$35=[COUNT(DISTINCT $35)], EXPR$36=[COUNT(DISTINCT $36)], 
> EXPR$37=[COUNT(DISTINCT $37)], EXPR$38=[COUNT(DISTINCT $38)], 
> EXPR$39=[COUNT(DISTINCT $39)], EXPR$40=[COUNT(DISTINCT $40)], 
> EXPR$41=[COUNT(DISTINCT $41)], EXPR$42=[COUNT(DISTINCT $42)], 
> EXPR$43=[COUNT(DISTINCT $43)], EXPR$44=[COUNT(DISTINCT $44)], 
> EXPR$45=[COUNT(DISTINCT $45)], EXPR$46=[COUNT(DISTINCT $46)], 
> EXPR$47=[COUNT(DISTINCT $47)], EXPR$48=[COUNT(DISTINCT $48)], 
> EXPR$49=[COUNT(DISTINCT $49)], EXPR$50=[COUNT(DISTINCT $50)], 
> EXPR$51=[COUNT(DISTINCT $51)], EXPR$52=[COUNT(DISTINCT $52)], 
> EXPR$53=[COUNT(DISTINCT $53)], EXPR$54=[COUNT(DISTINCT $54)], 
> EXPR$55=[COUNT(DISTINCT $55)], EXPR$56=[COUNT(DISTINCT $56)], 
> EXPR$57=[COUNT(DISTINCT $57)], EXPR$58=[COUNT(DISTINCT $58)], 
> EXPR$59=[COUNT(DISTINCT $59)], EXPR$60=[COUNT(DISTINCT $60)], 
> EXPR$61=[COUNT(DISTINCT $61)], EXPR$62=[COUNT(DISTINCT $62)], 
> EXPR$63=[COUNT(DISTINCT $63)], EXPR$64=[COUNT(DISTINCT $64)])
>    +- LogicalProject(exprs=[[*($1, 1), *($1, 2), *($1, 3), *($1, 4), *($1, 
> 5), *($1, 6), *($1, 7), *($1, 8), *($1, 9), *($1, 10), *($1, 11), *($1, 12), 
> *($1, 13), *($1, 14), *($1, 15), *($1, 16), *($1, 17), *($1, 18), *($1, 19), 
> *($1, 20), *($1, 21), *($1, 22), *($1, 23), *($1, 24), *($1, 25), *($1, 26), 
> *($1, 27), *($1, 28), *($1, 29), *($1, 30), *($1, 31), *($1, 32), *($1, 33), 
> *($1, 34), *($1, 35), *($1, 36), *($1, 37), *($1, 38), *($1, 39), *($1, 40), 
> *($1, 41), *($1, 42), *($1, 43), *($1, 44), *($1, 45), *($1, 46), *($1, 47), 
> *($1, 48), *($1, 49), *($1, 50), *($1, 51), *($1, 52), *($1, 53), *($1, 54), 
> *($1, 55), *($1, 56), *($1, 57), *($1, 58), *($1, 59), *($1, 60), *($1, 61), 
> *($1, 62), *($1, 63), *($1, 64), *($1, 65)]])
>       +- LogicalTableScan(table=[[default_catalog, default_database, 
> datagen_source]])group count must be less than 64.
> Please check the documentation for the set of currently supported SQL 
> features.
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:86)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
>     at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
>     at 
> scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
>     at scala.collection.Iterator.foreach(Iterator.scala:937)
>     at scala.collection.Iterator.foreach$(Iterator.scala:937)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>     at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
>     at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
>     at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
>     at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:92)
>     at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57)
>     at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:44)
>     at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:44)
>     at scala.collection.immutable.List.foreach(List.scala:388)
>     at 
> org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:44)
>     at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:78)
>     at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:312)
>     at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:192)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1688)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:840)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1342)
>     at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:708)
>     at com.shopee.di.Test64Distinct.main(Test64Distinct.java:11)
> Caused by: org.apache.flink.table.api.TableException: group count must be 
> less than 64.
>     at 
> org.apache.flink.table.planner.plan.rules.logical.DecomposeGroupingSetsRule.onMatch(DecomposeGroupingSetsRule.scala:177)
>     at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229)
>     at 
> org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58)
>     at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510)
>     at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312)
>     at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62)
>     ... 27 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to