Re:退订

2022-05-22 文章 Xuyang
Hi,退订请发送至邮箱user-zh-unsubscr...@flink.apache.org
在 2022-05-22 15:32:02,"xudongjun123...@163.com"  写道:
>退订
>
>
>
>xudongjun123...@163.com


Re:在自定义表聚合函数 TableAggregateFunction 使用 emitUpdateWithRetract 异常

2022-05-22 文章 Xuyang
Hi, 可以将你的taf具体代码发出来吗?还有你的版本,也贴一下。




--

Best!
Xuyang





在 2022-05-22 22:35:46,"赢峰"  写道:
>
>
>在自定义表聚合函数 TableAggregateFunction 时使用的是 emitUpdateWithRetract 
>输出数据。在调用的时候参考文档的使用方式:
>```
>tEnv.from("stu_score")
>.groupBy($("course"))
>.flatAggregate(call(Top2RetractTableAggregateFunction.class, $("score")))
>.select($("course"), $("f0"), $("f1"))
>```
>使用默认 blink Planner,会抛出如下异常:
>```
>Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>Could not find an implementation method 'emitValue' in class 
>'com.flink.example.table.function.custom.Top2RetractTableAggregateFunction' 
>for function 'Top2' that matches the following signature:
>void 
>emitValue(com.flink.example.table.function.custom.Top2RetractTableAggregateFunction.Top2RetractAccumulator,
> org.apache.flink.util.Collector)
>```
>但是使用 Old Planner,则会正常输出:
>```
>StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>env.setParallelism(1);
>EnvironmentSettings settings = EnvironmentSettings
>.newInstance()
>.useOldPlanner()
>.inStreamingMode()
>.build();
>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
>```
>这是什么地方使用有问题?
>
>
>
>
> 


在自定义表聚合函数 TableAggregateFunction 使用 emitUpdateWithRetract 异常

2022-05-22 文章 赢峰


在自定义表聚合函数 TableAggregateFunction 时使用的是 emitUpdateWithRetract 
输出数据。在调用的时候参考文档的使用方式:
```
tEnv.from("stu_score")
.groupBy($("course"))
.flatAggregate(call(Top2RetractTableAggregateFunction.class, $("score")))
.select($("course"), $("f0"), $("f1"))
```
使用默认 blink Planner,会抛出如下异常:
```
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Could not find an implementation method 'emitValue' in class 
'com.flink.example.table.function.custom.Top2RetractTableAggregateFunction' for 
function 'Top2' that matches the following signature:
void 
emitValue(com.flink.example.table.function.custom.Top2RetractTableAggregateFunction.Top2RetractAccumulator,
 org.apache.flink.util.Collector)
```
但是使用 Old Planner,则会正常输出:
```
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useOldPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
```
这是什么地方使用有问题?




 

22/05/2022 22:28:33 自动保存草稿

2022-05-22 文章 赢峰


在自定义表聚合函数 TableAggregateFunction 时使用的是 emitUpdateWithRetract 
输出数据。在调用的时候参考文档的使用方式:
```
tEnv.from("stu_score")
.groupBy($("course"))
.flatAggregate(call(Top2RetractTableAggregateFunction.class, $("score")))
.select($("course"), $("f0"), $("f1"))
```
使用默认 blink Planner,会抛出如下异常:
```
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Could not find an implementation method 'emitValue' in class 
'com.flink.example.table.function.custom.Top2RetractTableAggregateFunction' for 
function 'Top2' that matches the following signature:
void 
emitValue(com.flink.example.table.function.custom.Top2RetractTableAggregateFunction.Top2RetractAccumulator,
 org.apache.flink.util.Collector)
```
但是使用 Old Planner,则会正常输出:
```
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useOldPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
```
这是什么地方使用有问题?

退订

2022-05-22 文章 xudongjun123...@163.com
退订



xudongjun123...@163.com