hayden zhou created FLINK-23353:
-----------------------------------
Summary: UDTAGG can't execute in Batch mode
Key: FLINK-23353
URL: https://issues.apache.org/jira/browse/FLINK-23353
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.13.1
Reporter: hayden zhou
{code:java}
public class Top2Test {
public static void main(String[] args) {
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode()build();
TableEnvironment tEnv = TableEnvironment.create(settings);
Table sourceTable = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("name",DataTypes.STRING()),
DataTypes.FIELD("price", DataTypes.INT())
),
row(1, "hayden", 18),
row(3, "hayden", 19),
row(4, "hayden", 20),
row(2, "jaylin", 20)
);
tEnv.createTemporaryView("source", sourceTable);
Table rT = tEnv.from("source")
.groupBy($("name"))
.flatAggregate(call(Top2.class, $("price")).as("price", "rank"))
.select($("name"), $("price"), $("rank"));
rT.execute().print();
}
public static class Top2Accumulator {
public Integer first;
public Integer second;
}
public static class Top2 extends TableAggregateFunction<Tuple2<Integer,
Integer>, Top2Accumulator> {
@Override
public Top2Accumulator createAccumulator() {
Top2Accumulator acc = new Top2Accumulator();
acc.first = Integer.MIN_VALUE;
acc.second = Integer.MIN_VALUE;
return acc;
}
public void accumulate(Top2Accumulator acc, Integer value) {
if (value > acc.first) {
acc.second = acc.first;
acc.first = value;
} else if (value > acc.second) {
acc.second = value;
}
}
public void merge(Top2Accumulator acc, Iterable<Top2Accumulator> it) {
for (Top2Accumulator otherAcc : it) {
accumulate(acc, otherAcc.first);
accumulate(acc, otherAcc.second);
}
}
public void emitValue(Top2Accumulator acc, Collector<Tuple2<Integer,
Integer>> out) {
if (acc.first != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.first, 1));
}
if (acc.second != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.second, 2));
}
}
}
}
{code}
got errors as below:
Exception in thread "main" org.apache.flink.table.api.TableException: Cannot
generate a valid execution plan for the given query:
LogicalSink(table=[default_catalog.default_database.Unregistered_Collect_Sink_1],
fields=[name, price, rank])
+- LogicalProject(name=[AS($0, _UTF-16LE'name')], price=[AS($1,
_UTF-16LE'price')], rank=[AS($2, _UTF-16LE'rank')])
+- LogicalTableAggregate(group=[{1}],
tableAggregate=[[flinktest$Top2Test$Top2$4619034833a29d53c136506047509219($2)]])
+- LogicalUnion(all=[true])
:- LogicalProject(id=[CAST(1):INTEGER],
name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"],
price=[CAST(18):INTEGER])
: +- LogicalValues(tuples=[[{ 0 }]])
:- LogicalProject(id=[CAST(3):INTEGER],
name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"],
price=[CAST(19):INTEGER])
: +- LogicalValues(tuples=[[{ 0 }]])
:- LogicalProject(id=[CAST(4):INTEGER],
name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"],
price=[CAST(20):INTEGER])
: +- LogicalValues(tuples=[[{ 0 }]])
+- LogicalProject(id=[CAST(2):INTEGER],
name=[CAST(_UTF-16LE'jaylin':VARCHAR(2147483647) CHARACTER SET
"UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"],
price=[CAST(20):INTEGER])
+- LogicalValues(tuples=[[{ 0 }]])
This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:72)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:87)
at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:46)
at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:46)
at scala.collection.immutable.List.foreach(List.scala:392)
at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:46)
at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:791)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1225)
at
org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:577)
at flinktest.Top2Test.main(Top2Test.java:37)
Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are
not enough rules to produce a node with desired properties: convention=LOGICAL,
FlinkRelDistributionTraitDef=any, sort=[].
Missing conversion is LogicalTableAggregate[convention: NONE -> LOGICAL]
There is 1 empty subset: rel#436:RelSubset#6.LOGICAL.any.[], the relevant part
of the original plan is as follows
409:LogicalTableAggregate(group=[{1}],
tableAggregate=[[flinktest$Top2Test$Top2$4619034833a29d53c136506047509219($2)]])
407:LogicalUnion(subset=[rel#408:RelSubset#5.NONE.any.[]], all=[true])
399:LogicalProject(subset=[rel#400:RelSubset#1.NONE.any.[]],
id=[CAST(1):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647)
CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"],
price=[CAST(18):INTEGER])
0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0
}]])
401:LogicalProject(subset=[rel#402:RelSubset#2.NONE.any.[]],
id=[CAST(3):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647)
CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"],
price=[CAST(19):INTEGER])
0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0
}]])
403:LogicalProject(subset=[rel#404:RelSubset#3.NONE.any.[]],
id=[CAST(4):INTEGER], name=[CAST(_UTF-16LE'hayden':VARCHAR(2147483647)
CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"],
price=[CAST(20):INTEGER])
0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0
}]])
405:LogicalProject(subset=[rel#406:RelSubset#4.NONE.any.[]],
id=[CAST(2):INTEGER], name=[CAST(_UTF-16LE'jaylin':VARCHAR(2147483647)
CHARACTER SET "UTF-16LE"):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"],
price=[CAST(20):INTEGER])
0:LogicalValues(subset=[rel#398:RelSubset#0.NONE.any.[0]], tuples=[[{ 0
}]])
if delete inBatchMode() method in
{code:java}
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
{code}
then it will running normaly
--
This message was sent by Atlassian Jira
(v8.3.4#803005)