Flink 版本:1.13.5
函数完整代码如下: ``` public class Top2RetractTableAggregateFunction extends TableAggregateFunction<Tuple2<Long, Integer>, Top2RetractTableAggregateFunction.Top2RetractAccumulator> { private static final Logger LOG = LoggerFactory.getLogger(Top2RetractTableAggregateFunction.class); // Top2 聚合中间结果数据结构 public static class Top2RetractAccumulator { public long beforeFirst = 0; public long beforeSecond = 0; public long afterFirst = 0; public long afterSecond = 0; } // 创建 Top2Accumulator 累加器并做初始化 @Override public Top2RetractAccumulator createAccumulator() { LOG.info("[INFO] createAccumulator ..........................."); Top2RetractAccumulator acc = new Top2RetractAccumulator(); acc.beforeFirst = Integer.MIN_VALUE; acc.beforeSecond = Integer.MIN_VALUE; acc.afterFirst = Integer.MIN_VALUE; acc.afterSecond = Integer.MIN_VALUE; return acc; } // 接收输入元素并累加到 Accumulator 数据结构 public void accumulate(Top2RetractAccumulator acc, Long value) { LOG.info("[INFO] accumulate ..........................."); if (value > acc.afterFirst) { acc.afterSecond = acc.afterFirst; acc.afterFirst = value; } else if (value > acc.afterSecond) { acc.afterSecond = value; } } // 带撤回的输出 public void emitUpdateWithRetract(Top2RetractAccumulator acc, RetractableCollector<Tuple2<Long, Integer>> out) { LOG.info("[INFO] emitUpdateWithRetract ..........................."); if (!Objects.equals(acc.afterFirst, acc.beforeFirst)) { // 撤回旧记录 if (acc.beforeFirst != Integer.MIN_VALUE) { out.retract(Tuple2.of(acc.beforeFirst, 1)); } // 输出新记录 out.collect(Tuple2.of(acc.afterFirst, 1)); acc.beforeFirst = acc.afterFirst; } if (!Objects.equals(acc.afterSecond, acc.beforeSecond)) { // 撤回旧记录 if (acc.beforeSecond != Integer.MIN_VALUE) { out.retract(Tuple2.of(acc.beforeSecond, 2)); } // 输出新记录 out.collect(Tuple2.of(acc.afterSecond, 2)); acc.beforeSecond = acc.afterSecond; } } } ``` 调用完整代码如下: ``` // 执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings settings = EnvironmentSettings .newInstance() .useOldPlanner() // Blink Planner 异常 Old Planner 可以 .inStreamingMode() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); DataStream<Row> sourceStream = env.fromElements( Row.of("李雷", "语文", 78), Row.of("韩梅梅", "语文", 50), Row.of("李雷", "语文", 99), Row.of("韩梅梅", "语文", 80), Row.of("李雷", "英语", 90), Row.of("韩梅梅", "英语", 40), Row.of("李雷", "英语", 98), Row.of("韩梅梅", "英语", 88) ); // 注册虚拟表 tEnv.createTemporaryView("stu_score", sourceStream, $("name"), $("course"), $("score")); // 注册临时i系统函数 tEnv.createTemporarySystemFunction("Top2", new Top2RetractTableAggregateFunction()); // 调用函数 tEnv.from("stu_score") .groupBy($("course")) .flatAggregate(call("Top2", $("score")).as("score", "rank")) .select($("course"), $("score"), $("rank")) .execute() .print(); ``` 在 2022-05-23 18:21:42,"sjf0115" <si_ji_f...@163.com> 写道: >函数代码如下:<br/>```<br/>public class Top2RetractTableAggregateFunction extends >TableAggregateFunction<Tuple2<Long, Integer>, >Top2RetractTableAggregateFunction.Top2RetractAccumulator> {<br/> private >static final Logger LOG = >LoggerFactory.getLogger(Top2RetractTableAggregateFunction.class);<br/> // >Top2 聚合中间结果数据结构<br/> public static class Top2RetractAccumulator {<br/> > public long beforeFirst = 0;<br/> public long beforeSecond = 0;<br/> > public long afterFirst = 0;<br/> public long afterSecond = >0;<br/> }<br/><br/> // 创建 Top2Accumulator 累加器并做初始化<br/> >@Override<br/> public Top2RetractAccumulator createAccumulator() {<br/> > LOG.info("[INFO] createAccumulator ...........................");<br/> > Top2RetractAccumulator acc = new Top2RetractAccumulator();<br/> >acc.beforeFirst = Integer.MIN_VALUE;<br/> acc.beforeSecond = >Integer.MIN_VALUE;<br/> acc.afterFirst = Integer.MIN_VALUE;<br/> >acc.afterSecond = Integer.MIN_VALUE;<br/> return acc;<br/> >}<br/><br/> // 接收输入元素并累加到 Accumulator 数据结构<br/> public void >accumulate(Top2RetractAccumulator acc, Long value) {<br/> >LOG.info("[INFO] accumulate ...........................");<br/> if >(value > acc.afterFirst) {<br/> acc.afterSecond = >acc.afterFirst;<br/> acc.afterFirst = value;<br/> } else if >(value > acc.afterSecond) {<br/> acc.afterSecond = value;<br/> > }<br/> }<br/><br/> // 带撤回的输出<br/> public void >emitUpdateWithRetract(Top2RetractAccumulator acc, >RetractableCollector<Tuple2<Long, Integer>> out) {<br/> >LOG.info("[INFO] emitUpdateWithRetract ...........................");<br/> > if (!Objects.equals(acc.afterFirst, acc.beforeFirst)) {<br/> // >撤回旧记录<br/> if (acc.beforeFirst != Integer.MIN_VALUE) {<br/> > out.retract(Tuple2.of(acc.beforeFirst, 1));<br/> }<br/> > // 输出新记录<br/> out.collect(Tuple2.of(acc.afterFirst, 1));<br/> > acc.beforeFirst = acc.afterFirst;<br/> }<br/> if >(!Objects.equals(acc.afterSecond, acc.beforeSecond)) {<br/> // >撤回旧记录<br/> if (acc.beforeSecond != Integer.MIN_VALUE) {<br/> > out.retract(Tuple2.of(acc.beforeSecond, 2));<br/> }<br/> > // 输出新记录<br/> out.collect(Tuple2.of(acc.afterSecond, >2));<br/> acc.beforeSecond = acc.afterSecond;<br/> }<br/> >}<br/>}<br/>```<br/>完整调用代码:<br/>```<br/>// 执行环境<br/>StreamExecutionEnvironment >env = >StreamExecutionEnvironment.getExecutionEnvironment();<br/>env.setParallelism(1);<br/>EnvironmentSettings > settings = EnvironmentSettings<br/> .newInstance()<br/> >.useOldPlanner() // Blink Planner 异常 Old Planner 可以<br/> >.inStreamingMode()<br/> .build();<br/>StreamTableEnvironment tEnv = >StreamTableEnvironment.create(env, settings);<br/><br/>DataStream<Row> >sourceStream = env.fromElements(<br/> Row.of("李雷", "语文", 78),<br/> > Row.of("韩梅梅", "语文", 50),<br/> Row.of("李雷", "语文", 99),<br/> >Row.of("韩梅梅", "语文", 80),<br/> Row.of("李雷", "英语", 90),<br/> >Row.of("韩梅梅", "英语", 40),<br/> Row.of("李雷", "英语", 98),<br/> >Row.of("韩梅梅", "英语", 88)<br/>);<br/><br/>// >注册虚拟表<br/>tEnv.createTemporaryView("stu_score", sourceStream, $("name"), >$("course"), $("score"));<br/>// >注册临时i系统函数<br/>tEnv.createTemporarySystemFunction("Top2", new >Top2RetractTableAggregateFunction());<br/>// >调用函数<br/>tEnv.from("stu_score")<br/> .groupBy($("course"))<br/> >.flatAggregate(call("Top2", $("score")).as("score", "rank"))<br/> >.select($("course"), $("score"), $("rank"))<br/> .execute()<br/> >.print();<br/>```<br/>Flink 版本:1.13.5 >在 2022-05-23 09:55:40,"Xuyang" <xyzhong...@163.com> 写道: >>Hi, 可以将你的taf具体代码发出来吗?还有你的版本,也贴一下。 >> >> >> >> >>-- >> >> Best! >> Xuyang >> >> >> >> >> >>在 2022-05-22 22:35:46,"赢峰" <si_ji_f...@163.com> 写道: >>> >>> >>>在自定义表聚合函数 TableAggregateFunction 时使用的是 emitUpdateWithRetract >>>输出数据。在调用的时候参考文档的使用方式: >>>``` >>>tEnv.from("stu_score") >>> .groupBy($("course")) >>> .flatAggregate(call(Top2RetractTableAggregateFunction.class, $("score"))) >>> .select($("course"), $("f0"), $("f1")) >>>``` >>>使用默认 blink Planner,会抛出如下异常: >>>``` >>>Exception in thread "main" org.apache.flink.table.api.ValidationException: >>>Could not find an implementation method 'emitValue' in class >>>'com.flink.example.table.function.custom.Top2RetractTableAggregateFunction' >>>for function 'Top2' that matches the following signature: >>>void >>>emitValue(com.flink.example.table.function.custom.Top2RetractTableAggregateFunction.Top2RetractAccumulator, >>> org.apache.flink.util.Collector) >>>``` >>>但是使用 Old Planner,则会正常输出: >>>``` >>>StreamExecutionEnvironment env = >>>StreamExecutionEnvironment.getExecutionEnvironment(); >>>env.setParallelism(1); >>>EnvironmentSettings settings = EnvironmentSettings >>> .newInstance() >>> .useOldPlanner() >>> .inStreamingMode() >>> .build(); >>>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); >>>``` >>>这是什么地方使用有问题? >>> >>> >>> >>> >>>