好的 谢谢
在 2022-05-24 21:23:56,"Xuyang" <xyzhong...@163.com> 写道:
>Hi,
>我debug到了代码里,似乎是个bug。如果使用OldPlanner的话,emitValue和emitUpdateWithRetract仅需定义一个就可以了,并且emitUpdateWithRetract的优先级大于emitValue。但是在Blink
> Planner里,只看有没有定义emitValue。你可以去Flink issue上提一下这个bug
>在 2022-05-23 18:24:17,"sjf0115" <si_ji_f...@163.com> 写道:
>>Flink 版本:1.13.5
>>
>>
>>
>>
>>函数完整代码如下:
>>```
>>public class Top2RetractTableAggregateFunction extends
>>TableAggregateFunction<Tuple2<Long, Integer>,
>>Top2RetractTableAggregateFunction.Top2RetractAccumulator> {
>> private static final Logger LOG =
>> LoggerFactory.getLogger(Top2RetractTableAggregateFunction.class);
>> // Top2 聚合中间结果数据结构
>> public static class Top2RetractAccumulator {
>> public long beforeFirst = 0;
>> public long beforeSecond = 0;
>> public long afterFirst = 0;
>> public long afterSecond = 0;
>> }
>>
>>
>> // 创建 Top2Accumulator 累加器并做初始化
>> @Override
>> public Top2RetractAccumulator createAccumulator() {
>> LOG.info("[INFO] createAccumulator ...........................");
>> Top2RetractAccumulator acc = new Top2RetractAccumulator();
>> acc.beforeFirst = Integer.MIN_VALUE;
>> acc.beforeSecond = Integer.MIN_VALUE;
>> acc.afterFirst = Integer.MIN_VALUE;
>> acc.afterSecond = Integer.MIN_VALUE;
>> return acc;
>> }
>>
>>
>> // 接收输入元素并累加到 Accumulator 数据结构
>> public void accumulate(Top2RetractAccumulator acc, Long value) {
>> LOG.info("[INFO] accumulate ...........................");
>> if (value > acc.afterFirst) {
>> acc.afterSecond = acc.afterFirst;
>> acc.afterFirst = value;
>> } else if (value > acc.afterSecond) {
>> acc.afterSecond = value;
>> }
>> }
>>
>>
>> // 带撤回的输出
>> public void emitUpdateWithRetract(Top2RetractAccumulator acc,
>> RetractableCollector<Tuple2<Long, Integer>> out) {
>> LOG.info("[INFO] emitUpdateWithRetract ...........................");
>> if (!Objects.equals(acc.afterFirst, acc.beforeFirst)) {
>> // 撤回旧记录
>> if (acc.beforeFirst != Integer.MIN_VALUE) {
>> out.retract(Tuple2.of(acc.beforeFirst, 1));
>> }
>> // 输出新记录
>> out.collect(Tuple2.of(acc.afterFirst, 1));
>> acc.beforeFirst = acc.afterFirst;
>> }
>> if (!Objects.equals(acc.afterSecond, acc.beforeSecond)) {
>> // 撤回旧记录
>> if (acc.beforeSecond != Integer.MIN_VALUE) {
>> out.retract(Tuple2.of(acc.beforeSecond, 2));
>> }
>> // 输出新记录
>> out.collect(Tuple2.of(acc.afterSecond, 2));
>> acc.beforeSecond = acc.afterSecond;
>> }
>> }
>>}
>>```
>>调用完整代码如下:
>>```
>>// 执行环境
>>StreamExecutionEnvironment env =
>>StreamExecutionEnvironment.getExecutionEnvironment();
>>env.setParallelism(1);
>>EnvironmentSettings settings = EnvironmentSettings
>> .newInstance()
>> .useOldPlanner() // Blink Planner 异常 Old Planner 可以
>> .inStreamingMode()
>> .build();
>>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
>>
>>
>>DataStream<Row> sourceStream = env.fromElements(
>> Row.of("李雷", "语文", 78),
>> Row.of("韩梅梅", "语文", 50),
>> Row.of("李雷", "语文", 99),
>> Row.of("韩梅梅", "语文", 80),
>> Row.of("李雷", "英语", 90),
>> Row.of("韩梅梅", "英语", 40),
>> Row.of("李雷", "英语", 98),
>> Row.of("韩梅梅", "英语", 88)
>>);
>>
>>
>>// 注册虚拟表
>>tEnv.createTemporaryView("stu_score", sourceStream, $("name"), $("course"),
>>$("score"));
>>// 注册临时i系统函数
>>tEnv.createTemporarySystemFunction("Top2", new
>>Top2RetractTableAggregateFunction());
>>// 调用函数
>>tEnv.from("stu_score")
>> .groupBy($("course"))
>> .flatAggregate(call("Top2", $("score")).as("score", "rank"))
>> .select($("course"), $("score"), $("rank"))
>> .execute()
>> .print();
>>```
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2022-05-23 18:21:42,"sjf0115" <si_ji_f...@163.com> 写道:
>>>函数代码如下:<br/>```<br/>public class Top2RetractTableAggregateFunction extends
>>>TableAggregateFunction<Tuple2<Long, Integer>,
>>>Top2RetractTableAggregateFunction.Top2RetractAccumulator> {<br/>
>>>private static final Logger LOG =
>>>LoggerFactory.getLogger(Top2RetractTableAggregateFunction.class);<br/> //
>>>Top2 聚合中间结果数据结构<br/> public static class Top2RetractAccumulator {<br/>
>>> public long beforeFirst = 0;<br/> public long beforeSecond =
>>>0;<br/> public long afterFirst = 0;<br/> public long
>>>afterSecond = 0;<br/> }<br/><br/> // 创建 Top2Accumulator 累加器并做初始化<br/>
>>> @Override<br/> public Top2RetractAccumulator createAccumulator() {<br/>
>>> LOG.info("[INFO] createAccumulator
>>>...........................");<br/> Top2RetractAccumulator acc = new
>>>Top2RetractAccumulator();<br/> acc.beforeFirst =
>>>Integer.MIN_VALUE;<br/> acc.beforeSecond = Integer.MIN_VALUE;<br/>
>>> acc.afterFirst = Integer.MIN_VALUE;<br/> acc.afterSecond =
>>>Integer.MIN_VALUE;<br/> return acc;<br/> }<br/><br/> //
>>>接收输入元素并累加到 Accumulator 数据结构<br/> public void
>>>accumulate(Top2RetractAccumulator acc, Long value) {<br/>
>>>LOG.info("[INFO] accumulate ...........................");<br/> if
>>>(value > acc.afterFirst) {<br/> acc.afterSecond =
>>>acc.afterFirst;<br/> acc.afterFirst = value;<br/> } else
>>>if (value > acc.afterSecond) {<br/> acc.afterSecond =
>>>value;<br/> }<br/> }<br/><br/> // 带撤回的输出<br/> public void
>>>emitUpdateWithRetract(Top2RetractAccumulator acc,
>>>RetractableCollector<Tuple2<Long, Integer>> out) {<br/>
>>>LOG.info("[INFO] emitUpdateWithRetract ...........................");<br/>
>>> if (!Objects.equals(acc.afterFirst, acc.beforeFirst)) {<br/>
>>>// 撤回旧记录<br/> if (acc.beforeFirst != Integer.MIN_VALUE) {<br/>
>>> out.retract(Tuple2.of(acc.beforeFirst, 1));<br/>
>>>}<br/> // 输出新记录<br/>
>>>out.collect(Tuple2.of(acc.afterFirst, 1));<br/> acc.beforeFirst =
>>>acc.afterFirst;<br/> }<br/> if
>>>(!Objects.equals(acc.afterSecond, acc.beforeSecond)) {<br/> //
>>>撤回旧记录<br/> if (acc.beforeSecond != Integer.MIN_VALUE) {<br/>
>>> out.retract(Tuple2.of(acc.beforeSecond, 2));<br/>
>>>}<br/> // 输出新记录<br/>
>>>out.collect(Tuple2.of(acc.afterSecond, 2));<br/> acc.beforeSecond
>>>= acc.afterSecond;<br/> }<br/>
>>>}<br/>}<br/>```<br/>完整调用代码:<br/>```<br/>//
>>>执行环境<br/>StreamExecutionEnvironment env =
>>>StreamExecutionEnvironment.getExecutionEnvironment();<br/>env.setParallelism(1);<br/>EnvironmentSettings
>>> settings = EnvironmentSettings<br/> .newInstance()<br/>
>>>.useOldPlanner() // Blink Planner 异常 Old Planner 可以<br/>
>>>.inStreamingMode()<br/> .build();<br/>StreamTableEnvironment tEnv =
>>>StreamTableEnvironment.create(env, settings);<br/><br/>DataStream<Row>
>>>sourceStream = env.fromElements(<br/> Row.of("李雷", "语文", 78),<br/>
>>> Row.of("韩梅梅", "语文", 50),<br/> Row.of("李雷", "语文", 99),<br/>
>>>Row.of("韩梅梅", "语文", 80),<br/> Row.of("李雷", "英语", 90),<br/>
>>>Row.of("韩梅梅", "英语", 40),<br/> Row.of("李雷", "英语", 98),<br/>
>>>Row.of("韩梅梅", "英语", 88)<br/>);<br/><br/>//
>>>注册虚拟表<br/>tEnv.createTemporaryView("stu_score", sourceStream, $("name"),
>>>$("course"), $("score"));<br/>//
>>>注册临时i系统函数<br/>tEnv.createTemporarySystemFunction("Top2", new
>>>Top2RetractTableAggregateFunction());<br/>//
>>>调用函数<br/>tEnv.from("stu_score")<br/> .groupBy($("course"))<br/>
>>> .flatAggregate(call("Top2", $("score")).as("score", "rank"))<br/>
>>>.select($("course"), $("score"), $("rank"))<br/> .execute()<br/>
>>> .print();<br/>```<br/>Flink 版本:1.13.5
>>>在 2022-05-23 09:55:40,"Xuyang" <xyzhong...@163.com> 写道:
>>>>Hi, 可以将你的taf具体代码发出来吗?还有你的版本,也贴一下。
>>>>
>>>>
>>>>
>>>>
>>>>--
>>>>
>>>> Best!
>>>> Xuyang
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>在 2022-05-22 22:35:46,"赢峰" <si_ji_f...@163.com> 写道:
>>>>>
>>>>>
>>>>>在自定义表聚合函数 TableAggregateFunction 时使用的是 emitUpdateWithRetract
>>>>>输出数据。在调用的时候参考文档的使用方式:
>>>>>```
>>>>>tEnv.from("stu_score")
>>>>> .groupBy($("course"))
>>>>> .flatAggregate(call(Top2RetractTableAggregateFunction.class,
>>>>> $("score")))
>>>>> .select($("course"), $("f0"), $("f1"))
>>>>>```
>>>>>使用默认 blink Planner,会抛出如下异常:
>>>>>```
>>>>>Exception in thread "main" org.apache.flink.table.api.ValidationException:
>>>>>Could not find an implementation method 'emitValue' in class
>>>>>'com.flink.example.table.function.custom.Top2RetractTableAggregateFunction'
>>>>> for function 'Top2' that matches the following signature:
>>>>>void
>>>>>emitValue(com.flink.example.table.function.custom.Top2RetractTableAggregateFunction.Top2RetractAccumulator,
>>>>> org.apache.flink.util.Collector)
>>>>>```
>>>>>但是使用 Old Planner,则会正常输出:
>>>>>```
>>>>>StreamExecutionEnvironment env =
>>>>>StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>env.setParallelism(1);
>>>>>EnvironmentSettings settings = EnvironmentSettings
>>>>> .newInstance()
>>>>> .useOldPlanner()
>>>>> .inStreamingMode()
>>>>> .build();
>>>>>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
>>>>>```
>>>>>这是什么地方使用有问题?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>