[ https://issues.apache.org/jira/browse/FLINK-28764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jark Wu reassigned FLINK-28764: ------------------------------- Assignee: Wei Zhong > Support more than 64 distinct aggregate function calls in one aggregate SQL > query > --------------------------------------------------------------------------------- > > Key: FLINK-28764 > URL: https://issues.apache.org/jira/browse/FLINK-28764 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.13.6, 1.14.5, 1.15.1 > Reporter: Wei Zhong > Assignee: Wei Zhong > Priority: Major > Fix For: 1.16.0 > > > Currently Flink SQL does not support more than 64 distinct aggregate function > calls in one aggregate SQL query. We encountered this problem while migrating > batch jobs from spark to flink. The spark job has 79 distinct aggregate > function calls in one aggregate SQL query. > Reproduce code: > {code:java} > public class Test64Distinct { > public static void main(String[] args) { > TableEnvironment tableEnv = > TableEnvironment.create(EnvironmentSettings.inBatchMode()); > tableEnv.executeSql("create table datagen_source(id BIGINT, val > BIGINT) with " + > "('connector'='datagen', 'number-of-rows'='1000')"); > tableEnv.executeSql("select " + > "count(distinct val * 1), " + > "count(distinct val * 2), " + > "count(distinct val * 3), " + > "count(distinct val * 4), " + > "count(distinct val * 5), " + > "count(distinct val * 6), " + > "count(distinct val * 7), " + > "count(distinct val * 8), " + > "count(distinct val * 9), " + > "count(distinct val * 10), " + > "count(distinct val * 11), " + > "count(distinct val * 12), " + > "count(distinct val * 13), " + > "count(distinct val * 14), " + > "count(distinct val * 15), " + > "count(distinct val * 16), " + > "count(distinct val * 17), " + > "count(distinct val * 18), " + > "count(distinct val * 19), " + > "count(distinct val * 20), " + > "count(distinct val * 21), " + > "count(distinct val * 22), " + > "count(distinct val * 23), " + > "count(distinct val * 24), " + > "count(distinct val * 25), " + > "count(distinct val * 26), " + > "count(distinct val * 27), " + > "count(distinct val * 28), " + > "count(distinct val * 29), " + > "count(distinct val * 30), " + > "count(distinct val * 31), " + > "count(distinct val * 32), " + > "count(distinct val * 33), " + > "count(distinct val * 34), " + > "count(distinct val * 35), " + > "count(distinct val * 36), " + > "count(distinct val * 37), " + > "count(distinct val * 38), " + > "count(distinct val * 39), " + > "count(distinct val * 40), " + > "count(distinct val * 41), " + > "count(distinct val * 42), " + > "count(distinct val * 43), " + > "count(distinct val * 44), " + > "count(distinct val * 45), " + > "count(distinct val * 46), " + > "count(distinct val * 47), " + > "count(distinct val * 48), " + > "count(distinct val * 49), " + > "count(distinct val * 50), " + > "count(distinct val * 51), " + > "count(distinct val * 52), " + > "count(distinct val * 53), " + > "count(distinct val * 54), " + > "count(distinct val * 55), " + > "count(distinct val * 56), " + > "count(distinct val * 57), " + > "count(distinct val * 58), " + > "count(distinct val * 59), " + > "count(distinct val * 60), " + > "count(distinct val * 61), " + > "count(distinct val * 62), " + > "count(distinct val * 63), " + > "count(distinct val * 64), " + > "count(distinct val * 65) from datagen_source").print(); > } > } {code} > Exception: > {code:java} > Exception in thread "main" org.apache.flink.table.api.TableException: Sql > optimization: Cannot generate a valid execution plan for the given query: > LogicalSink(table=[*anonymous_collect$1*], fields=[EXPR$0, EXPR$1, EXPR$2, > EXPR$3, EXPR$4, EXPR$5, EXPR$6, EXPR$7, EXPR$8, EXPR$9, EXPR$10, EXPR$11, > EXPR$12, EXPR$13, EXPR$14, EXPR$15, EXPR$16, EXPR$17, EXPR$18, EXPR$19, > EXPR$20, EXPR$21, EXPR$22, EXPR$23, EXPR$24, EXPR$25, EXPR$26, EXPR$27, > EXPR$28, EXPR$29, EXPR$30, EXPR$31, EXPR$32, EXPR$33, EXPR$34, EXPR$35, > EXPR$36, EXPR$37, EXPR$38, EXPR$39, EXPR$40, EXPR$41, EXPR$42, EXPR$43, > EXPR$44, EXPR$45, EXPR$46, EXPR$47, EXPR$48, EXPR$49, EXPR$50, EXPR$51, > EXPR$52, EXPR$53, EXPR$54, EXPR$55, EXPR$56, EXPR$57, EXPR$58, EXPR$59, > EXPR$60, EXPR$61, EXPR$62, EXPR$63, EXPR$64]) > +- LogicalAggregate(group=[{}], EXPR$0=[COUNT(DISTINCT $0)], > EXPR$1=[COUNT(DISTINCT $1)], EXPR$2=[COUNT(DISTINCT $2)], > EXPR$3=[COUNT(DISTINCT $3)], EXPR$4=[COUNT(DISTINCT $4)], > EXPR$5=[COUNT(DISTINCT $5)], EXPR$6=[COUNT(DISTINCT $6)], > EXPR$7=[COUNT(DISTINCT $7)], EXPR$8=[COUNT(DISTINCT $8)], > EXPR$9=[COUNT(DISTINCT $9)], EXPR$10=[COUNT(DISTINCT $10)], > EXPR$11=[COUNT(DISTINCT $11)], EXPR$12=[COUNT(DISTINCT $12)], > EXPR$13=[COUNT(DISTINCT $13)], EXPR$14=[COUNT(DISTINCT $14)], > EXPR$15=[COUNT(DISTINCT $15)], EXPR$16=[COUNT(DISTINCT $16)], > EXPR$17=[COUNT(DISTINCT $17)], EXPR$18=[COUNT(DISTINCT $18)], > EXPR$19=[COUNT(DISTINCT $19)], EXPR$20=[COUNT(DISTINCT $20)], > EXPR$21=[COUNT(DISTINCT $21)], EXPR$22=[COUNT(DISTINCT $22)], > EXPR$23=[COUNT(DISTINCT $23)], EXPR$24=[COUNT(DISTINCT $24)], > EXPR$25=[COUNT(DISTINCT $25)], EXPR$26=[COUNT(DISTINCT $26)], > EXPR$27=[COUNT(DISTINCT $27)], EXPR$28=[COUNT(DISTINCT $28)], > EXPR$29=[COUNT(DISTINCT $29)], EXPR$30=[COUNT(DISTINCT $30)], > EXPR$31=[COUNT(DISTINCT $31)], EXPR$32=[COUNT(DISTINCT $32)], > EXPR$33=[COUNT(DISTINCT $33)], EXPR$34=[COUNT(DISTINCT $34)], > EXPR$35=[COUNT(DISTINCT $35)], EXPR$36=[COUNT(DISTINCT $36)], > EXPR$37=[COUNT(DISTINCT $37)], EXPR$38=[COUNT(DISTINCT $38)], > EXPR$39=[COUNT(DISTINCT $39)], EXPR$40=[COUNT(DISTINCT $40)], > EXPR$41=[COUNT(DISTINCT $41)], EXPR$42=[COUNT(DISTINCT $42)], > EXPR$43=[COUNT(DISTINCT $43)], EXPR$44=[COUNT(DISTINCT $44)], > EXPR$45=[COUNT(DISTINCT $45)], EXPR$46=[COUNT(DISTINCT $46)], > EXPR$47=[COUNT(DISTINCT $47)], EXPR$48=[COUNT(DISTINCT $48)], > EXPR$49=[COUNT(DISTINCT $49)], EXPR$50=[COUNT(DISTINCT $50)], > EXPR$51=[COUNT(DISTINCT $51)], EXPR$52=[COUNT(DISTINCT $52)], > EXPR$53=[COUNT(DISTINCT $53)], EXPR$54=[COUNT(DISTINCT $54)], > EXPR$55=[COUNT(DISTINCT $55)], EXPR$56=[COUNT(DISTINCT $56)], > EXPR$57=[COUNT(DISTINCT $57)], EXPR$58=[COUNT(DISTINCT $58)], > EXPR$59=[COUNT(DISTINCT $59)], EXPR$60=[COUNT(DISTINCT $60)], > EXPR$61=[COUNT(DISTINCT $61)], EXPR$62=[COUNT(DISTINCT $62)], > EXPR$63=[COUNT(DISTINCT $63)], EXPR$64=[COUNT(DISTINCT $64)]) > +- LogicalProject(exprs=[[*($1, 1), *($1, 2), *($1, 3), *($1, 4), *($1, > 5), *($1, 6), *($1, 7), *($1, 8), *($1, 9), *($1, 10), *($1, 11), *($1, 12), > *($1, 13), *($1, 14), *($1, 15), *($1, 16), *($1, 17), *($1, 18), *($1, 19), > *($1, 20), *($1, 21), *($1, 22), *($1, 23), *($1, 24), *($1, 25), *($1, 26), > *($1, 27), *($1, 28), *($1, 29), *($1, 30), *($1, 31), *($1, 32), *($1, 33), > *($1, 34), *($1, 35), *($1, 36), *($1, 37), *($1, 38), *($1, 39), *($1, 40), > *($1, 41), *($1, 42), *($1, 43), *($1, 44), *($1, 45), *($1, 46), *($1, 47), > *($1, 48), *($1, 49), *($1, 50), *($1, 51), *($1, 52), *($1, 53), *($1, 54), > *($1, 55), *($1, 56), *($1, 57), *($1, 58), *($1, 59), *($1, 60), *($1, 61), > *($1, 62), *($1, 63), *($1, 64), *($1, 65)]]) > +- LogicalTableScan(table=[[default_catalog, default_database, > datagen_source]])group count must be less than 64. > Please check the documentation for the set of currently supported SQL > features. > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:86) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) > at > scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) > at scala.collection.Iterator.foreach(Iterator.scala:937) > at scala.collection.Iterator.foreach$(Iterator.scala:937) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) > at scala.collection.IterableLike.foreach(IterableLike.scala:70) > at scala.collection.IterableLike.foreach$(IterableLike.scala:69) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:92) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:44) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:44) > at scala.collection.immutable.List.foreach(List.scala:388) > at > org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:44) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:78) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:312) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:192) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1688) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:840) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1342) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:708) > at com.shopee.di.Test64Distinct.main(Test64Distinct.java:11) > Caused by: org.apache.flink.table.api.TableException: group count must be > less than 64. > at > org.apache.flink.table.planner.plan.rules.logical.DecomposeGroupingSetsRule.onMatch(DecomposeGroupingSetsRule.scala:177) > at > org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:229) > at > org.apache.calcite.plan.volcano.IterativeRuleDriver.drive(IterativeRuleDriver.java:58) > at > org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:510) > at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:312) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:62) > ... 27 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)