hayden zhou created FLINK-23353: ----------------------------------- Summary: UDTAGG can't execute in Batch mode Key: FLINK-23353 URL: https://issues.apache.org/jira/browse/FLINK-23353 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.13.1 Reporter: hayden zhou
{code:java} public class Top2Test { public static void main(String[] args) { EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode()build(); TableEnvironment tEnv = TableEnvironment.create(settings); Table sourceTable = tEnv.fromValues( DataTypes.ROW( DataTypes.FIELD("id", DataTypes.INT()), DataTypes.FIELD("name",DataTypes.STRING()), DataTypes.FIELD("price", DataTypes.INT()) ), row(1, "hayden", 18), row(3, "hayden", 19), row(4, "hayden", 20), row(2, "jaylin", 20) ); tEnv.createTemporaryView("source", sourceTable); Table rT = tEnv.from("source") .groupBy($("name")) .flatAggregate(call(Top2.class, $("price")).as("price", "rank")) .select($("name"), $("price"), $("rank")); rT.execute().print(); } public static class Top2Accumulator { public Integer first; public Integer second; } public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accumulator> { @Override public Top2Accumulator createAccumulator() { Top2Accumulator acc = new Top2Accumulator(); acc.first = Integer.MIN_VALUE; acc.second = Integer.MIN_VALUE; return acc; } public void accumulate(Top2Accumulator acc, Integer value) { if (value > acc.first) { acc.second = acc.first; acc.first = value; } else if (value > acc.second) { acc.second = value; } } public void merge(Top2Accumulator acc, Iterable<Top2Accumulator> it) { for (Top2Accumulator otherAcc : it) { accumulate(acc, otherAcc.first); accumulate(acc, otherAcc.second); } } public void emitValue(Top2Accumulator acc, Collector<Tuple2<Integer, Integer>> out) { if (acc.first != Integer.MIN_VALUE) { out.collect(Tuple2.of(acc.first, 1)); } if (acc.second != Integer.MIN_VALUE) { out.collect(Tuple2.of(acc.second, 2)); } } } } {code} got errors as below: Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: LogicalSink(table=[default_catalog.default_database.Unregistered_Collect_Sink_1], fields=[name, price, rank]) +- LogicalProject(name=[AS($0, _UTF-16LE'name')], price=[AS($1, _UTF-16LE'price')], rank=[AS($2, _UTF-16LE'rank')]) +- LogicalTableAggregate(group=[{1}], tableAggregate=[[flinktest$Top2Test$Top2$4619034833a29d53c136506047509219($2)]]) +- LogicalUnion(all=[true]) :- LogicalProject(id=[CAST(1):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], price=[CAST(18):INTEGER]) : +- LogicalValues(tuples=[[{ 0 }]]) :- LogicalProject(id=[CAST(3):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], price=[CAST(19):INTEGER]) : +- LogicalValues(tuples=[[{ 0 }]]) :- LogicalProject(id=[CAST(4):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], price=[CAST(20):INTEGER]) : +- LogicalValues(tuples=[[{ 0 }]]) +- LogicalProject(id=[CAST(2):INTEGER], name=[CAST(_UTF-16LE'jaylin':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], price=[CAST(20):INTEGER]) +- LogicalValues(tuples=[[{ 0 }]]) This exception indicates that the query uses an unsupported SQL feature. Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:72) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:87) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:46) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:46) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:46) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:791) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1225) at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:577) at flinktest.Top2Test.main(Top2Test.java:37) Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=LOGICAL, FlinkRelDistributionTraitDef=any, sort=[]. Missing conversion is LogicalTableAggregate[convention: NONE -> LOGICAL] There is 1 empty subset: rel#436:RelSubset#6.LOGICAL.any.[], the relevant part of the original plan is as follows 409:LogicalTableAggregate(group=[{1}], tableAggregate=[[flinktest$Top2Test$Top2$4619034833a29d53c136506047509219($2)]]) 407:LogicalUnion(subset=[rel#408:RelSubset#5.NONE.any.[]], all=[true]) 399:LogicalProject(subset=[rel#400:RelSubset#1.NONE.any.[]], id=[CAST(1):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], price=[CAST(18):INTEGER]) 0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0 }]]) 401:LogicalProject(subset=[rel#402:RelSubset#2.NONE.any.[]], id=[CAST(3):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], price=[CAST(19):INTEGER]) 0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0 }]]) 403:LogicalProject(subset=[rel#404:RelSubset#3.NONE.any.[]], id=[CAST(4):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], price=[CAST(20):INTEGER]) 0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0 }]]) 405:LogicalProject(subset=[rel#406:RelSubset#4.NONE.any.[]], id=[CAST(2):INTEGER], name=[CAST(_UTF-16LE'jaylin':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"], price=[CAST(20):INTEGER]) 0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0 }]]) if delete inBatchMode() method in {code:java} EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); {code} then it will running normaly -- This message was sent by Atlassian Jira (v8.3.4#803005)