是不是update-mode 改用 update模式 



------------------ 原始邮件 ------------------
发件人:                                                                            
                                            "user-zh"                           
                                                         <flin...@163.com&gt;;
发送时间:&nbsp;2020年8月12日(星期三) 下午3:58
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update 
changes



Hi Jark:
版本:1.11.0
问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错:
AppendStreamTableSink doesn't support consuming update changes which is 
produced by node GroupAggregate


我希望能够在sql校验时也能使upsert操作作用于kafka sink,或者等upsert完成,再写入kafka


附上执行sql:
create table kafka_table_1 (&nbsp; 
&nbsp;&nbsp;&nbsp; `shop_id` varchar,&nbsp; 
&nbsp;&nbsp;&nbsp; `user_id` bigint,&nbsp; 
&nbsp;&nbsp;&nbsp; `category_id` int, 
&nbsp;&nbsp;&nbsp; `ts` bigint,&nbsp; 
&nbsp;&nbsp;&nbsp; `row_time` timestamp(3), 
&nbsp;&nbsp;&nbsp; `proc_time` timestamp(3), 
) with (&nbsp; 
&nbsp;&nbsp;&nbsp; 'connector.type' = 'kafka',&nbsp; 
&nbsp;&nbsp;&nbsp; 'connector.version' = 'universal',&nbsp; 
&nbsp;&nbsp;&nbsp; 'connector.topic' = 'user_visit_1',&nbsp; 
&nbsp;&nbsp;&nbsp; 'connector.startup-mode' = 'latest-offset',&nbsp; 
&nbsp;&nbsp;&nbsp; 'connector.properties.bootstrap.servers' = 'ip:9092',&nbsp; 
&nbsp;&nbsp;&nbsp; 'connector.properties.zookeeper.connect' = 'ip:2181',&nbsp; 
&nbsp;&nbsp;&nbsp; 'update-mode' = 'append', 
&nbsp;&nbsp;&nbsp; 'format.type' = 'avro-registry', 
&nbsp;&nbsp;&nbsp; 'format.schema-subject' = 'user_visit', 
&nbsp;&nbsp;&nbsp; 'format.schema-url'='http://ip:8081', 
)


CREATE TABLE hbase_table ( 
&nbsp;&nbsp;&nbsp; rowKey STRING, 
&nbsp;&nbsp;&nbsp; cf ROW<age STRING, area STRING&gt; 
) WITH ( 
&nbsp;&nbsp;&nbsp; 'connector.type' = 'hbase', 
&nbsp;&nbsp;&nbsp; 'connector.version' = '1.4.3', 
&nbsp;&nbsp;&nbsp; 'connector.table-name' = 'hbase_table', 
&nbsp;&nbsp;&nbsp; 'connector.zookeeper.quorum' = 'ip:2181', 
&nbsp;&nbsp;&nbsp; 'connector.zookeeper.znode.parent' = '/hbase', 
&nbsp;&nbsp;&nbsp; 'connector.write.buffer-flush.max-rows' = '1000' 
)




create table kafka_table_2 (&nbsp; 
&nbsp;&nbsp;&nbsp; `shop_id` varchar,&nbsp; 
&nbsp;&nbsp;&nbsp; `age` varchar,&nbsp; 
&nbsp;&nbsp;&nbsp; `area` varchar
) with (&nbsp; 
&nbsp;&nbsp;&nbsp; 'connector.type' = 'kafka',&nbsp; 
&nbsp;&nbsp;&nbsp; 'connector.version' = 'universal',&nbsp; 
&nbsp;&nbsp;&nbsp; 'connector.topic' = 'user_visit_2',&nbsp; 
&nbsp;&nbsp;&nbsp; 'connector.startup-mode' = 'latest-offset',&nbsp; 
&nbsp;&nbsp;&nbsp; 'connector.properties.bootstrap.servers' = 'ip:9092',&nbsp; 
&nbsp;&nbsp;&nbsp; 'connector.properties.zookeeper.connect' = 'ip:2181',&nbsp; 
&nbsp;&nbsp;&nbsp; 'update-mode' = 'append', 
&nbsp;&nbsp;&nbsp; 'format.type' = 'avro-registry', 
&nbsp;&nbsp;&nbsp; 'format.schema-subject' = 'user_visit', 
&nbsp;&nbsp;&nbsp; 'format.schema-url'='http://ip:8081', 
)


insert into kafka_table_2(shop_id, user_id, category_id, ts, row_time, 
proc_time)
select shop_id, age, area 
from kafka_table_1 left join hbase_table
for system_time as of kafka_table_1.proc_time as temp on kafka_table_1.shop_id 
= temp.rowKey
group by shop_id, age, area


&nbsp;原始邮件 
发件人: xiao cai<flin...@163.com&gt;
收件人: user-zh<user-zh@flink.apache.org&gt;
发送时间: 2020年8月12日(周三) 15:41
主题: AppendStreamTableSink doesn't support consuming update changes


Hi Jark: 版本:1.11.0 问题:flink-sql,数据经过group by 和left join后写入kafka 
sink,会在语法校验阶段报错: AppendStreamTableSink doesn't support consuming update changes 
which is produced by node GroupAggregate

回复