Dear 开发者:
目前发现从kafka sink到 hbase 会丢数据,相同的sql ,如果用jdbc方式 来sink 则不会丢失数据,具体建表sql 和任务sql 如下 flink 版本 1.12 源表: 使用canal-json 接入 create table rt_ods.ods_za_log_member_base_info( MemberId bigint COMMENT '用户ID', NickName string COMMENT '用户昵称', proctime as PROCTIME() ) WITH ( 'properties.bootstrap.servers' = 'XXXX:9092', 'connector' = 'kafka', 'format' = 'canal-json', 'topic' = 'ods_user_info', 'properties.group.id' = 'rt_ods-ods_za_log_member_base_info', 'scan.startup.mode' = 'group-offsets' ) sink 表: hbase create table rt_dwd.dwd_za_log_user_info( memberid STRING, f1 ROW < nick_name string >, PRIMARY KEY (memberid) NOT ENFORCED ) WITH ( 'sink.buffer-flush.max-size' = '0', 'connector' = 'hbase-1.4', 'zookeeper.quorum' = '10.51.3.48:2181', 'sink.buffer-flush.interval' = '3s', 'sink.buffer-flush.max-rows' = '100', 'table-name' = 'dwd_user_info', 'sink.parallelism' = '5' ) 具体sql : insert into rt_dwd.dwd_za_log_user_info select cast(t1.MemberId as VARCHAR) ,ROW( t1.NickName ) from rt_ods.ods_za_log_member_base_info t1 where t1.MemberId is not null ; 发现sink到 hbase 的时候会丢失数据,如果sink到mysql 则数据写入完整,mysql 建表语句 create table rt_dwd.dwd_za_log_user_info_mysql( memberid string COMMENT '主键ID', nick_name string COMMENT '昵称', primary key(memberid) NOT ENFORCED ) WITH ( 'password' = 'xxxx', 'connector' = 'jdbc', 'sink.buffer-flush.interval' = '3s', 'sink.buffer-flush.max-rows' = '100', 'table-name' = 'dwd_user_info', 'url' = 'jdbc:mysql://xxxxx', 'username' = 'xxx' ) 补充: 数据源是百分表, 将这一百张表的binlog数据 以canal-json的格式打入kafka,然后消费并sink 到hbase 不知道是否是hbase sink的参数调整有问题,我把 'sink.buffer-flush.interval' = '3s', 'sink.buffer-flush.max-rows' = '100', 都设置成0 也无用