Re: 使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update changes

2020-08-12 文章 xiao cai
Dear Leonard Xu:
我会去关注这个issue,非常感谢答疑。


 原始邮件 
发件人: Leonard Xu
收件人: user-zh
发送时间: 2020年8月12日(周三) 16:05
主题: Re: 使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update 
changes


Hi Group by 和 left join 都是会有 retract 消息的,这类消息需要UpsertStreamTableSink才能处理, Kafka 
connetor 目前的实现是AppendStreamTableSink,所以不能处理 社区已经有一个issue在处理这个问题了,应该1.12能提供这个功能。 
Best Leonard [1]https://issues.apache.org/jira/browse/FLINK-18826 
<">https://issues.apache.org/jira/browse/FLINK-18826> > 在 2020年8月12日,15:58,xiao 
cai  写道: > > Hi Jark: > 版本:1.11.0 > 问题:flink-sql,数据经过group by 
和left join后写入kafka sink,会在语法校验阶段报错: > AppendStreamTableSink doesn't support 
consuming update changes which is produced by node GroupAggregate > > > 
我希望能够在sql校验时也能使upsert操作作用于kafka sink,或者等upsert完成,再写入kafka > > > 附上执行sql: > 
create table kafka_table_1 ( > `shop_id` varchar, > `user_id` bigint, > 
`category_id` int, > `ts` bigint, > `row_time` timestamp(3), > `proc_time` 
timestamp(3), > ) with ( > 'connector.type' = 'kafka', > 'connector.version' = 
'universal', > 'connector.topic' = 'user_visit_1', > 'connector.startup-mode' = 
'latest-offset', > 'connector.properties.bootstrap.servers' = 'ip:9092', > 
'connector.properties.zookeeper.connect' = 'ip:2181', > 'update-mode' = 
'append', > 'format.type' = 'avro-registry', > 'format.schema-subject' = 
'user_visit', > 'format.schema-url'='http://ip:8081', > ) > > > CREATE TABLE 
hbase_table ( > rowKey STRING, > cf ROW > ) WITH ( > 
'connector.type' = 'hbase', > 'connector.version' = '1.4.3', > 
'connector.table-name' = 'hbase_table', > 'connector.zookeeper.quorum' = 
'ip:2181', > 'connector.zookeeper.znode.parent' = '/hbase', > 
'connector.write.buffer-flush.max-rows' = '1000' > ) > > > > > create table 
kafka_table_2 ( > `shop_id` varchar, > `age` varchar, > `area` varchar > ) with 
( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 
'connector.topic' = 'user_visit_2', > 'connector.startup-mode' = 
'latest-offset', > 'connector.properties.bootstrap.servers' = 'ip:9092', > 
'connector.properties.zookeeper.connect' = 'ip:2181', > 'update-mode' = 
'append', > 'format.type' = 'avro-registry', > 'format.schema-subject' = 
'user_visit', > 'format.schema-url'='http://ip:8081', > ) > > > insert into 
kafka_table_2(shop_id, user_id, category_id, ts, row_time, proc_time) > select 
shop_id, age, area > from kafka_table_1 left join hbase_table > for system_time 
as of kafka_table_1.proc_time as temp on kafka_table_1.shop_id = temp.rowKey > 
group by shop_id, age, area > > > 原始邮件 > 发件人: xiao cai > 收件人: 
user-zh > 发送时间: 2020年8月12日(周三) 15:41 > 主题: 
AppendStreamTableSink doesn't support consuming update changes > > > Hi Jark: 
版本:1.11.0 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错: 
AppendStreamTableSink doesn't support consuming update changes which is 
produced by node GroupAggregate

Re: 使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update changes

2020-08-12 文章 Leonard Xu
Hi

Group by 和 left join 都是会有 retract 消息的,这类消息需要UpsertStreamTableSink才能处理, Kafka 
connetor 目前的实现是AppendStreamTableSink,所以不能处理
社区已经有一个issue在处理这个问题了,应该1.12能提供这个功能。


Best
Leonard
[1]https://issues.apache.org/jira/browse/FLINK-18826 


> 在 2020年8月12日,15:58,xiao cai  写道:
> 
> Hi Jark:
> 版本:1.11.0
> 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错:
> AppendStreamTableSink doesn't support consuming update changes which is 
> produced by node GroupAggregate
> 
> 
> 我希望能够在sql校验时也能使upsert操作作用于kafka sink,或者等upsert完成,再写入kafka
> 
> 
> 附上执行sql:
> create table kafka_table_1 (  
>`shop_id` varchar,  
>`user_id` bigint,  
>`category_id` int, 
>`ts` bigint,  
>`row_time` timestamp(3), 
>`proc_time` timestamp(3), 
> ) with (  
>'connector.type' = 'kafka',  
>'connector.version' = 'universal',  
>'connector.topic' = 'user_visit_1',  
>'connector.startup-mode' = 'latest-offset',  
>'connector.properties.bootstrap.servers' = 'ip:9092',  
>'connector.properties.zookeeper.connect' = 'ip:2181',  
>'update-mode' = 'append', 
>'format.type' = 'avro-registry', 
>'format.schema-subject' = 'user_visit', 
>'format.schema-url'='http://ip:8081', 
> )
> 
> 
> CREATE TABLE hbase_table ( 
>rowKey STRING, 
>cf ROW 
> ) WITH ( 
>'connector.type' = 'hbase', 
>'connector.version' = '1.4.3', 
>'connector.table-name' = 'hbase_table', 
>'connector.zookeeper.quorum' = 'ip:2181', 
>'connector.zookeeper.znode.parent' = '/hbase', 
>'connector.write.buffer-flush.max-rows' = '1000' 
> )
> 
> 
> 
> 
> create table kafka_table_2 (  
>`shop_id` varchar,  
>`age` varchar,  
>`area` varchar
> ) with (  
>'connector.type' = 'kafka',  
>'connector.version' = 'universal',  
>'connector.topic' = 'user_visit_2',  
>'connector.startup-mode' = 'latest-offset',  
>'connector.properties.bootstrap.servers' = 'ip:9092',  
>'connector.properties.zookeeper.connect' = 'ip:2181',  
>'update-mode' = 'append', 
>'format.type' = 'avro-registry', 
>'format.schema-subject' = 'user_visit', 
>'format.schema-url'='http://ip:8081', 
> )
> 
> 
> insert into kafka_table_2(shop_id, user_id, category_id, ts, row_time, 
> proc_time)
> select shop_id, age, area 
> from kafka_table_1 left join hbase_table
> for system_time as of kafka_table_1.proc_time as temp on 
> kafka_table_1.shop_id = temp.rowKey
> group by shop_id, age, area
> 
> 
> 原始邮件 
> 发件人: xiao cai
> 收件人: user-zh
> 发送时间: 2020年8月12日(周三) 15:41
> 主题: AppendStreamTableSink doesn't support consuming update changes
> 
> 
> Hi Jark: 版本:1.11.0 问题:flink-sql,数据经过group by 和left join后写入kafka 
> sink,会在语法校验阶段报错: AppendStreamTableSink doesn't support consuming update 
> changes which is produced by node GroupAggregate



回复:使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update changes

2020-08-12 文章 xuzh
是不是update-mode 改用 update模式 




-- 原始邮件 --
发件人:
"user-zh"   
 

使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update changes

2020-08-12 文章 xiao cai
Hi Jark:
版本:1.11.0
问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错:
AppendStreamTableSink doesn't support consuming update changes which is 
produced by node GroupAggregate


我希望能够在sql校验时也能使upsert操作作用于kafka sink,或者等upsert完成,再写入kafka


附上执行sql:
create table kafka_table_1 (  
`shop_id` varchar,  
`user_id` bigint,  
`category_id` int, 
`ts` bigint,  
`row_time` timestamp(3), 
`proc_time` timestamp(3), 
) with (  
'connector.type' = 'kafka',  
'connector.version' = 'universal',  
'connector.topic' = 'user_visit_1',  
'connector.startup-mode' = 'latest-offset',  
'connector.properties.bootstrap.servers' = 'ip:9092',  
'connector.properties.zookeeper.connect' = 'ip:2181',  
'update-mode' = 'append', 
'format.type' = 'avro-registry', 
'format.schema-subject' = 'user_visit', 
'format.schema-url'='http://ip:8081', 
)


CREATE TABLE hbase_table ( 
rowKey STRING, 
cf ROW 
) WITH ( 
'connector.type' = 'hbase', 
'connector.version' = '1.4.3', 
'connector.table-name' = 'hbase_table', 
'connector.zookeeper.quorum' = 'ip:2181', 
'connector.zookeeper.znode.parent' = '/hbase', 
'connector.write.buffer-flush.max-rows' = '1000' 
)




create table kafka_table_2 (  
`shop_id` varchar,  
`age` varchar,  
`area` varchar
) with (  
'connector.type' = 'kafka',  
'connector.version' = 'universal',  
'connector.topic' = 'user_visit_2',  
'connector.startup-mode' = 'latest-offset',  
'connector.properties.bootstrap.servers' = 'ip:9092',  
'connector.properties.zookeeper.connect' = 'ip:2181',  
'update-mode' = 'append', 
'format.type' = 'avro-registry', 
'format.schema-subject' = 'user_visit', 
'format.schema-url'='http://ip:8081', 
)


insert into kafka_table_2(shop_id, user_id, category_id, ts, row_time, 
proc_time)
select shop_id, age, area 
from kafka_table_1 left join hbase_table
for system_time as of kafka_table_1.proc_time as temp on kafka_table_1.shop_id 
= temp.rowKey
group by shop_id, age, area


 原始邮件 
发件人: xiao cai
收件人: user-zh
发送时间: 2020年8月12日(周三) 15:41
主题: AppendStreamTableSink doesn't support consuming update changes


Hi Jark: 版本:1.11.0 问题:flink-sql,数据经过group by 和left join后写入kafka 
sink,会在语法校验阶段报错: AppendStreamTableSink doesn't support consuming update changes 
which is produced by node GroupAggregate