[ https://issues.apache.org/jira/browse/FLINK-33566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-33566: ----------------------------------- Labels: HBase-2.0 flink-connector-hbase pull-request-available (was: HBase-2.0 flink-connector-hbase) > HBase sql-connector needs overwrite the rowKey > ---------------------------------------------- > > Key: FLINK-33566 > URL: https://issues.apache.org/jira/browse/FLINK-33566 > Project: Flink > Issue Type: Improvement > Components: Connectors / HBase > Affects Versions: 1.18.0 > Environment: flink: 1.18.0 > hbase: 2.2.3 > flink-connector-hbase-2.2:3.0.0-1.18 > Reporter: JankoWilliam > Priority: Major > Labels: HBase-2.0, flink-connector-hbase, pull-request-available > Fix For: hbase-3.0.0 > > > When I want to write label values of 50+to a rowkey column family in HBase > (all values are label values of 0/1), for example: > {"id":"1111","q1":"0","q2":"1","q3":"0","q4":"1",...,"q49":"0","q50":"1"} > Here are four label values for example: > {"id":"1111","q1":"0","q2":"1","q3":"0","q4":"1"} > {code:java} > --source: > CREATE TABLE kafka_table_( > `id` STRING, > `q1` STRING, > `q2` STRING, > `q3` STRING, > `q4` STRING > ) WITH ( > ... > 'connector'='kafka', > 'format'='json', > ... > ); > --sink: > CREATE TABLE hbase_table_ ( > rowkey STRING, > cf ROW<q1 INT,q2 INT,q3 INT,q4 INT>, > PRIMARY KEY (rowkey ) NOT ENFORCED > ) WITH ( > 'connector' = 'my-hbase-2.2', > 'table-name' = 'test_table', > 'zookeeper.quorum' = '127.0.0.1' > ); > --insert: > insert into hbase_table_ > select > id AS rowkey , > ROW( cast(q1 as INT),cast(q2 as INT),cast(q3 as INT),cast(q4 as INT)) as cf > from kafka_table_ ;{code} > hbase: > hbase(main):016:0> scan 'test_table' > ROW COLUMN+CELL > > > 1111 column=cf:q1, > timestamp=0000000000001, value=\x00\x00\x00\x00 > > > 1111 column=cf:q2, > timestamp=0000000000001, value=\x00\x00\x00\x01 > > > 1111 column=cf:q3, > timestamp=0000000000001, value=\x00\x00\x00\x00 > > > 1111 column=cf:q4, > timestamp=0000000000001, value=\x00\x00\x00\x01 > > Upstream data has a fixed value of 50+k-v data, among which very few value > values are 1 (the default label value is 0). For example, only 1 or 2 values > are 1: q2=1, q4=1, so I want HBase to store the following values: > hbase(main):016:0> scan 'test_table' > ROW COLUMN+CELL > > > 1111 column=cf:q2, > timestamp=0000000000001, value=\x00\x00\x00\x01 > > 1111 column=cf:q4, > timestamp=0000000000001, value=\x00\x00\x00\x01 > > When I use the "sink. ignore null value" keyword here, It just don't update > the null value, and downstream third parties will still read all the values > (such as 50+), but there are only 2 values that are truly 1: > {code:java} > --sink: > CREATE TABLE hbase_table_ ( > rowkey STRING, > cf ROW<q1 INT,q2 INT,q3 INT,q4 INT>, > PRIMARY KEY (rowkey ) NOT ENFORCED > ) WITH ( > 'connector' = 'my-hbase-2.2', > 'table-name' = 'test_table', > 'sink.ignore-null-value' = 'true', > 'zookeeper.quorum' = '127.0.0.1' > ); > --insert: > insert into hbase_table_ > select > id AS rowkey , > ROW( > case when q1 <> '0' then cast(q1 as INT) else null end, > case when q2 <> '0' then cast(q2 as INT) else null end, > case when q3 <> '0' then cast(q3 as INT) else null end, > case when q4 <> '0' then cast(q4 as INT) else null end > ) as cf > from kafka_table_ ; {code} > hbase(main):016:0> scan 'test_table' > ROW COLUMN+CELL > > > 1111 column=cf:q1, > timestamp=0000000000001, value=\x00\x00\x00\x00 > > > 1111 column=cf:q2, > timestamp=0000000000002, value=\x00\x00\x00\x01 > > > 1111 column=cf:q3, > timestamp=0000000000001, value=\x00\x00\x00\x00 > > > 1111 column=cf:q4, > timestamp=0000000000002, value=\x00\x00\x00\x01 > > There are no other configurations available, so I hope to have the function > of overwriting and writing rowKey, that is, deleting the rowkey before adding > new data. -- This message was sent by Atlassian Jira (v8.20.10#820010)