Re:退订
Hi,退订请发送至邮箱user-zh-unsubscr...@flink.apache.org 在 2022-05-22 15:32:02,"xudongjun123...@163.com" 写道: >退订 > > > >xudongjun123...@163.com
Re:在自定义表聚合函数 TableAggregateFunction 使用 emitUpdateWithRetract 异常
Hi, 可以将你的taf具体代码发出来吗?还有你的版本,也贴一下。 -- Best! Xuyang 在 2022-05-22 22:35:46,"赢峰" 写道: > > >在自定义表聚合函数 TableAggregateFunction 时使用的是 emitUpdateWithRetract >输出数据。在调用的时候参考文档的使用方式: >``` >tEnv.from("stu_score") >.groupBy($("course")) >.flatAggregate(call(Top2RetractTableAggregateFunction.class, $("score"))) >.select($("course"), $("f0"), $("f1")) >``` >使用默认 blink Planner,会抛出如下异常: >``` >Exception in thread "main" org.apache.flink.table.api.ValidationException: >Could not find an implementation method 'emitValue' in class >'com.flink.example.table.function.custom.Top2RetractTableAggregateFunction' >for function 'Top2' that matches the following signature: >void >emitValue(com.flink.example.table.function.custom.Top2RetractTableAggregateFunction.Top2RetractAccumulator, > org.apache.flink.util.Collector) >``` >但是使用 Old Planner,则会正常输出: >``` >StreamExecutionEnvironment env = >StreamExecutionEnvironment.getExecutionEnvironment(); >env.setParallelism(1); >EnvironmentSettings settings = EnvironmentSettings >.newInstance() >.useOldPlanner() >.inStreamingMode() >.build(); >StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); >``` >这是什么地方使用有问题? > > > > >
在自定义表聚合函数 TableAggregateFunction 使用 emitUpdateWithRetract 异常
在自定义表聚合函数 TableAggregateFunction 时使用的是 emitUpdateWithRetract 输出数据。在调用的时候参考文档的使用方式: ``` tEnv.from("stu_score") .groupBy($("course")) .flatAggregate(call(Top2RetractTableAggregateFunction.class, $("score"))) .select($("course"), $("f0"), $("f1")) ``` 使用默认 blink Planner,会抛出如下异常: ``` Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not find an implementation method 'emitValue' in class 'com.flink.example.table.function.custom.Top2RetractTableAggregateFunction' for function 'Top2' that matches the following signature: void emitValue(com.flink.example.table.function.custom.Top2RetractTableAggregateFunction.Top2RetractAccumulator, org.apache.flink.util.Collector) ``` 但是使用 Old Planner,则会正常输出: ``` StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings settings = EnvironmentSettings .newInstance() .useOldPlanner() .inStreamingMode() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); ``` 这是什么地方使用有问题?
22/05/2022 22:28:33 自动保存草稿
在自定义表聚合函数 TableAggregateFunction 时使用的是 emitUpdateWithRetract 输出数据。在调用的时候参考文档的使用方式: ``` tEnv.from("stu_score") .groupBy($("course")) .flatAggregate(call(Top2RetractTableAggregateFunction.class, $("score"))) .select($("course"), $("f0"), $("f1")) ``` 使用默认 blink Planner,会抛出如下异常: ``` Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not find an implementation method 'emitValue' in class 'com.flink.example.table.function.custom.Top2RetractTableAggregateFunction' for function 'Top2' that matches the following signature: void emitValue(com.flink.example.table.function.custom.Top2RetractTableAggregateFunction.Top2RetractAccumulator, org.apache.flink.util.Collector) ``` 但是使用 Old Planner,则会正常输出: ``` StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); EnvironmentSettings settings = EnvironmentSettings .newInstance() .useOldPlanner() .inStreamingMode() .build(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); ``` 这是什么地方使用有问题?
退订
退订 xudongjun123...@163.com