你可以用Canal或者Debezium format来写入kafka,那样就支持update和delete消息了。

op <520075...@qq.com> 于2020年7月29日周三 上午11:59写道:

> 如下,想用sql直接往kafka写聚合结果,版本是1.11,请问能有什么办法解决,还是只能转换成datastream??
>
>
> 谢谢
>
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Table sink 'default_catalog.default_database.mvp_rtdwb_user_business'
> doesn't support consuming update changes which is produced by node
> GroupAggregate(groupBy=[dt, user_id], select=[dt, user_id, SUM($f2) AS
> text_feed_count, SUM($f3) AS picture_feed_count, SUM($f4) AS
> be_comment_forward_user_count, SUM($f5) AS share_link_count, SUM($f6) AS
> share_music_count, SUM($f7) AS share_video_count, SUM($f8) AS follow_count,
> SUM($f9) AS direct_post_count, SUM($f10) AS comment_post_count, SUM($f11)
> AS comment_count, SUM($f12) AS fans_count, MAX(event_time) AS event_time])



-- 

Best,
Benchao Li

Reply via email to