[ 
https://issues.apache.org/jira/browse/FLINK-32139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17724543#comment-17724543
 ] 

LiuZeshan commented on FLINK-32139:
-----------------------------------

[~ferenc-csaky] [~martijnvisser] Could you help to take a look at this issue.

cc [~mgergely] [~Leonard] [~jark] 

> Data accidentally deleted and not deleted when upsert sink to hbase
> -------------------------------------------------------------------
>
>                 Key: FLINK-32139
>                 URL: https://issues.apache.org/jira/browse/FLINK-32139
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / HBase
>            Reporter: LiuZeshan
>            Priority: Major
>              Labels: pull-request-available
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> h4. *Problem background*
> We meet data accidental deletion and non deletion issues when synchronizing 
> MySQL to HBase using MySQL-CDC and HBase connectors.
> h3. Reproduction steps
> 1、The Flink job with 1 parallelism synchronize a MySQL table into HBase. 
> SinkMaterializer is tunned off by setting 
> {{{}table.exec.sink.upsert-materialize = 'NONE'{}}}。
> MySQL table schema is as follows。
> {code:java}
> CREATE TABLE `source_sample_1001` (
> `id` int(11) NOT NULL AUTO_INCREMENT,
> `name` varchar(200) DEFAULT NULL,
> `age` int(11) DEFAULT NULL,
> `weight` float DEFAULT NULL,
> PRIMARY KEY (`id`)
> );{code}
> The source table definition in Flink is as follows.
> {code:java}
> CREATE TABLE `source_sample_1001` (
>     `id` bigint,
>     `name` String,
>     `age` bigint,
>     `weight` float,
>   PRIMARY KEY (`id`) NOT ENFORCED
> ) WITH (
> 'connector' = 'mysql-cdc' ,
> 'hostname' = '${ip}',
> 'port' = '3306',
> 'username' = '${user}',
> 'password' = '${password}',
> 'database-name' = 'testdb_0010',
> 'table-name' = 'source_sample_1001'
> );{code}
> HBase sink table are created in {{testdb_0011}} namespace.
> {code:java}
> CREATE 'testdb_0011:source_sample_1001', 'data'
> ​
> describe 'testdb_0011:source_sample_1001'
>  
> # describe output
> Table testdb_0011:source_sample_1001 is ENABLED                               
>                                                                               
>                                             
> testdb_0011:source_sample_1001                                                
>                                                                               
>                                             
> COLUMN FAMILIES DESCRIPTION                                                   
>                                                                               
>                                             {NAME => 'data', BLOOMFILTER => 
> 'ROW', IN_MEMORY => 'false', VERSIONS => '1', KEEP_DELETED_CELLS => 'FALSE', 
> DATA_BLOCK_ENCODING => 'NONE', COMPRESSION => 'NONE', TTL => 'FOREVER', 
> MIN_VERSIONS => '0' , BLOCKCACHE => 'true', BLOCKSIZE => '65536', 
> REPLICATION_SCOPE => '0'}
> {code}
>  
>                                                                               
>                                                   
> The sink table definition in Flink.
> {code:java}
> CREATE TABLE `hbase_sink1` (
>   `id` STRING COMMENT 'unique id',
>   `data` ROW<
>       `name` string,
>       `age` bigint,
>       `weight` float
>   >,
>   primary key(`id`) not enforced
> ) WITH (
>   'connector' = 'hbase-2.2',
>   'table-name' = 'testdb_0011:source_sample_1001',
>   'zookeeper.quorum' = '${hbase.zookeeper.quorum}'
> );{code}
> DML in flink to synchronize data.
> {code:java}
> INSERT INTO `hbase_sink1` SELECT
>   REVERSE(CONCAT_WS('', CAST(`id` AS VARCHAR))) as `id`,
>   ROW(`name`, `age`, `weight`)
> FROM `source_sample_1001`;{code}
> 2、Another flink job sink datagen data to the MySQL table 
> {{source_sample_1001}} 。id range from 1 to 10_000, that means 
> source_sample_1001 will have at most 10_000 records。
> {code:java}
> CREATE TABLE datagen_source (
>     `id` int,
>     `name` String,
>     `age` int,
>     `weight` float
> ) WITH (
> 'connector' = 'datagen',
> 'fields.id.kind' = 'random',
> 'fields.id.min' = '1',
> 'fields.id.max' = '10000',
> 'fields.name.length' = '20',
> 'rows-per-second' = '5000'
> );
> ​
> CREATE TABLE `source_sample_1001` (
>     `id` bigint,
>     `name` String,
>     `age` bigint,
>     `weight` float,
>   PRIMARY KEY (`id`) NOT ENFORCED
> ) WITH (
>   'connector' = 'jdbc',
>   'url' = 
> 'jdbc:mysql://${ip}:3306/testdb_0010?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',
>   'table-name' = 'source_sample_1001',
>   'username' = '${user}',
>   'password' = '${password}',
>   'sink.buffer-flush.max-rows' = '500',
>   'sink.buffer-flush.interval' = '1s'
> );
> ​
> -- dml
> INSERT INTO `source_sample_1001` SELECT * FROM `datagen_source`;
> {code}
> 3、A bash script deletes the MySQL table {{source_sample_1001}} with batch 10.
> {code:java}
> #!/bin/bash
> ​
> mysql1="mysql -h${ip} -u${user} -p${password}"
> batch=10
> ​
> for ((i=1; ;i++)); do
> echo "iteration $i start"
> for ((j=1; j<=10000; j+=10)); do
>   $mysql1 -e "delete from testdb_0010.source_sample_1001 where id >= $j and 
> id < $((j+10))"
> done
> echo "iteration $i end"
> sleep 10
> done{code}
> 4、Start the above two flink jobs and the bash script. Wait for several 
> minutes, usually 5 minutes is enough. Please note that deleting data bash 
> script is necessary for reproduce the problem.
> 5、Stop the bash script, and waiting for MySQL table to fill up with 10_000 
> data by the datagen flink job。And then stop datagen flink job. Waiting for 
> the sink hbase job to read all the binlog of MySQL table 
> {{{}source_sample_1001{}}}.
> 6、Check the hbase table and reproduce the issue of data loss. As shown below, 
> 67 records of data were lost in a test.
> {code:java}
> hbase(main):006:0> count 'testdb_0011:source_sample_1001'                     
>                               
> 9933 row(s)
> Took 0.8724 seconds                                                           
>                                                                               
>                                             
> => 9933{code}
> Find out a missing record of data and check the raw data in HBase.
> {code:java}
> hbase(main):008:0> get 'testdb_0011:source_sample_1001', '24'
> COLUMN                                             CELL                       
>                                                                               
>                                            
> 0 row(s)
> Took 0.0029 seconds                                                           
>                                                                               
>                                             
> hbase(main):009:0> scan 'testdb_0011:source_sample_1001', {RAW => true, 
> VERSIONS => 1000, STARTROW => '24', STOPROW => '24'}
> ROW                                                 COLUMN+CELL               
>                                                                               
>                                             
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:44.884, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:44.884, value=3a8f571c25a9d9040ef3                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:43.769, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:43.769, value=5aada98281ee0a961841                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:42.902, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:42.902, value=599790a9a641e6121ab3                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:41.614, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:41.614, value=4ece6410d32959457f80                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:40.885, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:40.885, value=9edcfcf1c958a7e4ae2a                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:40.841, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:40.841, value=3d82dcf982d5bcd5b6b7                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:39.788, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:39.788, value=2888a338b65caaf15b30                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:35.799, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:35.799, value=a8d7549e18ef0c0e8674                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:35.688, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:35.688, value=ada7237e52d030dcef7a                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:35.650, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:35.650, value=482feed26918dcdc911e                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:34.885, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:34.885, value=36d6bdd585dbb65dedb7                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:33.905, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:33.905, value=6e15c4462f8435040700                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:33.803, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:33.803, value=d122df5afd4eac32da72                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:33.693, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:33.693, value=ed603d47fedb3852b520                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:31.784, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:31.784, value=1ebdd5fe6310850b8098                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:30.684, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:30.684, value=cc628ba45d1ad07fce2f                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:29.812, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:29.812, value=c1d4df6e987bdb3cd0a3                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:29.590, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:29.590, value=535557700ca01c6b6b1e                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:28.876, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:28.876, value=a63c2ebfefc82eab4bcf                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:28.565, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:28.565, value=dd2b24ff0dfa672c49ba                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:27.879, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:27.879, value=69dbe1287c2bc54781ab                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:27.699, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:27.699, value=775d06dcbf1148e665ee                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:24.209, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:24.209, value=e23c010ab06125c88870                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:22.480, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:20.716, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:18.678, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:17.720, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:16.858, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:16.682, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:15.753, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:14.571, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:11.572, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:09.681, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:08.792, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:05.888, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:05.754, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:03.626, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:02.652, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:01.790, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:00.986, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:16:59.797, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:16:58.982, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:16:58.781, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:16:58.626, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:16:58.149, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:16:56.610, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:16:51.655, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:16:51.458, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:16:44.860, type=Delete                                
>                                                     
> 1 row(s)
> Took 0.1466 seconds                                                           
>                        {code}
> 7、Start the bash script to delete all data of the MySQL table. Waiting for 
> the sink hbase job to read all the binlog of MySQL table 
> {{{}source_sample_1001{}}}.
> 6、Check the hbase table and reproduce the issue of data no delete. As shown 
> below, 6 records of data were not deleted in the test.
> {code:java}
> hbase(main):012:0> count 'testdb_0011:source_sample_1001'
> 6 row(s)
> Took 0.5121 seconds                                                           
>                                                                               
>                                             
> => 6{code}
> Check the raw data of a record in HBase.
> {code:java}
> hbase(main):013:0> get 'testdb_0011:source_sample_1001', '3668'
> COLUMN                                             CELL                       
>                                                                               
>                                            
> data:name                                         
> timestamp=2023-05-20T21:17:26.714, value=ebb15f905622340d0351                 
>                                                                       
> 1 row(s)
> Took 0.0037 seconds                                                           
>                                                                               
>                                             
> hbase(main):014:0> scan 'testdb_0011:source_sample_1001', {RAW => true, 
> VERSIONS => 1000, STARTROW => '3668', STOPROW => '3668'}
> ROW                                                 COLUMN+CELL               
>                                                                               
>                                             
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:45.728, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:45.728, value=c675a12c7cbed27599c3                 
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:44.693, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:44.693, value=413921aa1ac44f545954                 
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:43.854, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:43.854, value=7d44b0efc0923e4035b7                 
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:41.721, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:41.721, value=60bfaef81bf8efdf781a                 
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:40.763, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:40.763, value=2c371f9cd3909dd3b3f8                 
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:37.872, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:37.872, value=9e32087cb39065976e50                 
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:32.573, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:32.573, value=708364bf84dad4a04170                 
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:26.811, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:26.811, value=c0e8e11eed3f8410dea9                 
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:26.714, value=ebb15f905622340d0351                 
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:24.310, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:24.310, value=21681a161ed2ccbe884e                 
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:23.508, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:23.508, value=a1ef547a9efd57a7a0e2                 
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:22.788, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:22.788, value=34e688060e6c40f4f83b                 
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:21.746, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:17.761, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:12.610, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:11.909, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:07.846, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:06.901, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:06.758, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:06.569, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:02.689, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:00.344, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:16:59.961, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:16:59.415, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:16:58.916, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:16:58.781, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:16:58.718, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:16:58.339, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:16:56.340, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:16:55.883, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:16:55.683, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:16:55.056, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:16:46.845, type=Delete                                
>                                                     
> 1 row(s)
> Took 0.0457 seconds                                                           
>                                {code}
> h4. *Reason for the problem*
> The [HBase 
> connector|https://github.com/apache/flink/blob/06688f345f6793a8964ec00002175f44cda13c33/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java#L177]
>  use the [Delete key 
> type|https://github.com/apache/hbase/blob/c05ee564d3026688bcfdc456071059c7c8409694/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java#L380]
>  [without 
> timestamp|https://github.com/apache/flink/blob/06688f345f6793a8964ec00002175f44cda13c33/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java#L168]
>  to {{{}delete the latest version of the specified column. This is an 
> expensive call in that on the server-side, it first does a get to find the 
> latest versions timestamp. Then it adds a delete using the fetched cells 
> timestamp{}}}. Causing the following issues:
> Problem 1: When writing update data, the timestamp of -U and +U added by the 
> hbase server to the update message may be the same, and -U deleted the latest 
> version of +U data, resulting in accidental deletion of the data. The problem 
> was also reported by https://issues.apache.org/jira/browse/FLINK-28910
> Problem 2: When there are multiple versions of HBase data, deleting the data 
> will exposes earlier versions of the data, and resulting in the issue of data 
> no delete.
> h4. *Solution proposal* 
> Use the [DeleteColumn key 
> type|https://github.com/apache/hbase/blob/c05ee564d3026688bcfdc456071059c7c8409694/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java#L322]
>  and set strongly increasing timestamp for 
> [put|https://github.com/lzshlzsh/flink/blob/a2341810a244b97a3af32951e17efbc49f570cdd/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java#L138]
>  and 
> [delete|https://github.com/lzshlzsh/flink/blob/a2341810a244b97a3af32951e17efbc49f570cdd/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java#L170]
>  mutation. The delete mutation will delete all versions of the specified 
> column with a timestamp less than or equal to the specified.
> I have test the proposed solution for several days, and neither the data 
> accidental deletion nor non deletion issues happen.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to