INSERT INTO kafka_dws_artemis_out_order select warehouse_id, count(*) from 
kafka_ods_artemis_out_order group by warehouse_id;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Table sink 
'myhive.wanglei.kafka_dws_artemis_out_order' doesn't support consuming update 
changes which is produced by node GroupAggregate(groupBy=[warehouse_id], 
select=[warehouse_id, COUNT(*) AS EXPR$1])

在 Flink-1.10 中可以更改 KafkaTableSinkBase 让它 implements RetractStream 实现。
 
我看现在 Flink-1.11 中是用了  KafkaDynamicSource, KafkaDynamicSink,这样怎样改动才能让 GroupBy 
的结果也发送到 Kafka 呢?

谢谢,
王磊 


wangl...@geekplus.com.cn 

回复