Carl created FLINK-26123: ---------------------------- Summary: The data of the upsert-kafka source cannot be written to HBase under sql where conditions Key: FLINK-26123 URL: https://issues.apache.org/jira/browse/FLINK-26123 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.12.1 Reporter: Carl Attachments: image-2022-02-14-16-43-13-172.png, image-2022-02-14-16-47-26-820.png, image-2022-02-14-17-14-43-158.png, image-2022-02-14-17-15-27-611.png, image-2022-02-14-17-18-23-864.png, image-2022-02-14-17-20-04-228.png, image-2022-02-14-17-30-02-525.png, image-2022-02-14-17-32-40-475.png
*1. source table :* *(1) kafka topic :* ./kafka-topics.sh -create -zookeeper kafka01:2181,kafka02:2181,kafka03:2181 -replication-factor 2 -partitions 2 -topic sink-hbase-where-01 *(2) flink kafka table :* create table hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source( id string , ver int , dp_last_chg_time timestamp(3) , kafka_ts timestamp(3) METADATA FROM 'timestamp' VIRTUAL , load_ts AS PROCTIME() , ts as dp_last_chg_time , WATERMARK FOR dp_last_chg_time AS dp_last_chg_time - INTERVAL '20' SECOND , PRIMARY KEY (id) not enforced )WITH ( 'connector' = 'upsert-kafka', 'topic' = 'sink-hbase-where-01', 'properties.group.id' = 'sink-hbase-where-group-01', 'properties.zookeeper.connect' = '...', 'properties.bootstrap.servers' = '...', 'key.format' = 'json', 'key.json.ignore-parse-errors' = 'true', 'value.format' = 'json', 'value.json.ignore-parse-errors' = 'true', 'value.fields-include' = 'ALL' ); *2. sink table :* CREATE TABLE hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink ( `pk` STRING , info1 ROW<ver string, dp_last_chg_time string, kafka_ts string, load_ts string> , PRIMARY KEY (`pk`) NOT ENFORCED ) WITH ( 'connector' = 'hbase-2.2', 'table-name' = 'sink-hbase-where', 'sink.buffer-flush.max-size' = '0', 'sink.buffer-flush.max-rows' ='0', 'zookeeper.quorum' = '...' ); *3. flink sql :* insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts) from ( select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1 *where cast(t1.id as string) = '555'* ) *4. test* *test 1 :* (1) produce kafka message: {"id":"{*}555{*}"},\{"id":"555","ver":1,"dp_last_chg_time":"2022-02-14 12:04:00"} (2) scan hbase table : !image-2022-02-14-17-14-43-158.png! (3) produce kafka message: {"id":"{*}555{*}"},{"id":"555","ver":{*}2{*},"dp_last_chg_time":"2022-02-14 12:04:00"} (4) scan hbase table : !image-2022-02-14-17-15-27-611.png! (5) ** produce kafka message: {"id":"{*}666{*}"},{"id":"666","ver":{*}1{*},"dp_last_chg_time":"2022-02-14 12:04:00"} (6) scan hbase table : !image-2022-02-14-17-18-23-864.png! *test 2 :* (1) cancel the flink app in idea (2) truncate hbase table : !image-2022-02-14-16-47-26-820.png! (3) run the flink app in idea (4) scan hbase table : No data was written to HBase. !image-2022-02-14-17-20-04-228.png! *test 3 :* (1) cancel the flink app in idea (2) Delete where condition : insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts) from ( select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1 --{*}where cast(t1.id as string) = '555'{*} ) (3) run the flink app in idea (4) scan hbase table : the data was written to HBase : !image-2022-02-14-17-30-02-525.png! *test 4 :* (1) cancel the flink app in idea (2) update the where condition : insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts) from ( select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1 --{*}where cast(t1.id as string) = '666'{*} ) (3) run the flink app in idea (4) scan hbase table : the data was written to HBase : !image-2022-02-14-17-32-40-475.png! >From the test results, (1) there are two pieces of data with a primary key of '555' {"id":"555"},\{"id":"555","ver":1,"dp_last_chg_time":"2022-02-14 12:04:00"} {"id":"555"},\{"id":"555","ver":2,"dp_last_chg_time":"2022-02-14 12:04:00"} (2) there is one piece of data with a primary key of '666' {"id":"666"},\{"id":"666","ver":1,"dp_last_chg_time":"2022-02-14 12:04:00"} *re-run the flink sql in idea, the following conclusions are drawn* (1) With the following SQL, both data can be written in insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts) from ( select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1 ) (2) With the following SQL, no data can be written in insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts) from ( select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1 *where cast(t1.id as string) = '555'* ) (2) With the following SQL, '666' data can be written in insert into hivecatalog.rt_flink.mid_dwd_t04_sink_hbase_where_01_hbase_sink select keys, Row(ver, dp_last_chg_time, kafka_ts, load_ts) from ( select cast(t1.id as string) as keys, cast(t1.ver as string) as ver, cast(t1.ts as string) as dp_last_chg_time, cast(t1.kafka_ts as string) as kafka_ts, cast(TIMESTAMPADD(HOUR,8,t1.load_ts) as string) as load_ts from hivecatalog.rt_flink.dwd_t04_sink_hbase_where_01_kafka_source t1 *where cast(t1.id as string) = '666'* ) -- This message was sent by Atlassian Jira (v8.20.1#820001)