[ 
https://issues.apache.org/jira/browse/FLINK-33566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-33566:
-----------------------------------
    Labels: HBase-2.0 flink-connector-hbase pull-request-available  (was: 
HBase-2.0 flink-connector-hbase)

> HBase sql-connector needs overwrite the rowKey
> ----------------------------------------------
>
>                 Key: FLINK-33566
>                 URL: https://issues.apache.org/jira/browse/FLINK-33566
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / HBase
>    Affects Versions: 1.18.0
>         Environment: flink: 1.18.0
> hbase: 2.2.3
> flink-connector-hbase-2.2:3.0.0-1.18
>            Reporter: JankoWilliam
>            Priority: Major
>              Labels: HBase-2.0, flink-connector-hbase, pull-request-available
>             Fix For: hbase-3.0.0
>
>
> When I want to write label values of 50+to a rowkey column family in HBase 
> (all values are label values of 0/1), for example:
> {"id":"1111","q1":"0","q2":"1","q3":"0","q4":"1",...,"q49":"0","q50":"1"}
> Here are four label values for example:
> {"id":"1111","q1":"0","q2":"1","q3":"0","q4":"1"}
> {code:java}
> --source:
> CREATE TABLE kafka_table_(
>  `id` STRING,
>  `q1` STRING,
>  `q2` STRING,
>  `q3` STRING,
>  `q4` STRING
> )  WITH (
> ...
>    'connector'='kafka',
>    'format'='json',
> ...
> );
> --sink:
> CREATE TABLE hbase_table_ (
>  rowkey  STRING,
>  cf ROW<q1 INT,q2 INT,q3 INT,q4 INT>,
>  PRIMARY KEY (rowkey ) NOT ENFORCED
> ) WITH (
>   'connector' = 'my-hbase-2.2',
>   'table-name' = 'test_table',
>   'zookeeper.quorum' = '127.0.0.1'
> );
> --insert:
> insert into hbase_table_ 
> select 
>  id AS rowkey ,
>  ROW( cast(q1 as INT),cast(q2 as INT),cast(q3 as INT),cast(q4 as INT)) as cf
>  from kafka_table_  ;{code}
> hbase:
> hbase(main):016:0> scan 'test_table'
> ROW                                                        COLUMN+CELL        
>                                                                               
>                                                                           
>  1111                                                      column=cf:q1, 
> timestamp=0000000000001, value=\x00\x00\x00\x00                               
>                                                                               
>  
>  1111                                                      column=cf:q2, 
> timestamp=0000000000001, value=\x00\x00\x00\x01                               
>                                                                               
>  
>  1111                                                      column=cf:q3, 
> timestamp=0000000000001, value=\x00\x00\x00\x00                               
>                                                                               
>  
>  1111                                                      column=cf:q4, 
> timestamp=0000000000001, value=\x00\x00\x00\x01 
>  
> Upstream data has a fixed value of 50+k-v data, among which very few value 
> values are 1 (the default label value is 0). For example, only 1 or 2 values 
> are 1: q2=1, q4=1, so I want HBase to store the following values:
> hbase(main):016:0> scan 'test_table'
> ROW                                                        COLUMN+CELL        
>                                                                               
>                                                                           
>  1111                                                      column=cf:q2, 
> timestamp=0000000000001, value=\x00\x00\x00\x01                               
>                                                                               
>  1111                                                      column=cf:q4, 
> timestamp=0000000000001, value=\x00\x00\x00\x01 
>  
> When I use the "sink. ignore null value" keyword here, It just don't update 
> the null value, and downstream third parties will still read all the values 
> (such as 50+), but there are only 2 values that are truly 1:
> {code:java}
> --sink:
> CREATE TABLE hbase_table_ (
>  rowkey  STRING,
>  cf ROW<q1 INT,q2 INT,q3 INT,q4 INT>,
>  PRIMARY KEY (rowkey ) NOT ENFORCED
> ) WITH (
>   'connector' = 'my-hbase-2.2',
>   'table-name' = 'test_table',
>   'sink.ignore-null-value' = 'true',
>   'zookeeper.quorum' = '127.0.0.1'
> ); 
> --insert:
> insert into hbase_table_ 
> select 
>  id AS rowkey ,
>  ROW(
>  case when q1 <> '0' then cast(q1 as INT) else null end,
>  case when q2 <> '0' then cast(q2 as INT) else null end,
>  case when q3 <> '0' then cast(q3 as INT) else null end,
>  case when q4 <> '0' then cast(q4 as INT) else null end 
> ) as cf
>  from kafka_table_ ; {code}
> hbase(main):016:0> scan 'test_table'
> ROW                                                        COLUMN+CELL        
>                                                                               
>                                                                           
>  1111                                                      column=cf:q1, 
> timestamp=0000000000001, value=\x00\x00\x00\x00                               
>                                                                               
>  
>  1111                                                      column=cf:q2, 
> timestamp=0000000000002, value=\x00\x00\x00\x01                               
>                                                                               
>  
>  1111                                                      column=cf:q3, 
> timestamp=0000000000001, value=\x00\x00\x00\x00                               
>                                                                               
>  
>  1111                                                      column=cf:q4, 
> timestamp=0000000000002, value=\x00\x00\x00\x01 
>  
> There are no other configurations available, so I hope to have the function 
> of overwriting and writing rowKey, that is, deleting the rowkey before adding 
> new data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to