LiuZeshan created FLINK-32139:
---------------------------------

             Summary: Data accidentally deleted and not deleted when upsert 
sink to hbase
                 Key: FLINK-32139
                 URL: https://issues.apache.org/jira/browse/FLINK-32139
             Project: Flink
          Issue Type: Bug
          Components: Connectors / HBase
            Reporter: LiuZeshan


h4. *Problem background*

We meet data accidental deletion and non deletion issues when synchronizing 
MySQL to HBase using MySQL-CDC and HBase connectors.
h3. Reproduction steps

1、The Flink job with 1 parallelism synchronize a MySQL table into HBase. 
SinkMaterializer is tunned off by setting {{table.exec.sink.upsert-materialize 
= 'NONE'}}。

 MySQL table schema is as follows。
CREATE TABLE `source_sample_1001` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(200) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  `weight` float DEFAULT NULL,
  PRIMARY KEY (`id`)
);
The source table definition in Flink is as follows.
CREATE TABLE `source_sample_1001` (
     `id` bigint,
     `name` String,
     `age` bigint,
     `weight` float,
    PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc' ,
  'hostname' = '${ip}',
  'port' = '3306',
  'username' = '${user}',
  'password' = '${password}',
  'database-name' = 'testdb_0010',
  'table-name' = 'source_sample_1001'
);
 HBase sink table are created in {{testdb_0011}} namespace.
CREATE 'testdb_0011:source_sample_1001', 'data'
​
describe 'testdb_0011:source_sample_1001'
# describe output
Table testdb_0011:source_sample_1001 is ENABLED                                 
                                                                                
                                        
testdb_0011:source_sample_1001                                                  
                                                                                
                                        
COLUMN FAMILIES DESCRIPTION                                                     
                                                                                
                                        
{NAME => 'data', BLOOMFILTER => 'ROW', IN_MEMORY => 'false', VERSIONS => '1', 
KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', COMPRESSION => 
'NONE', TTL => 'FOREVER', MIN_VERSIONS => '0'
, BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}         
                                                                                
                                        
The sink table definition in Flink.
CREATE TABLE `hbase_sink1` (
    `id` STRING COMMENT 'unique id',
    `data` ROW<
        `name` string,
        `age` bigint,
        `weight` float
    >,
    primary key(`id`) not enforced
) WITH (
    'connector' = 'hbase-2.2',
    'table-name' = 'testdb_0011:source_sample_1001',
    'zookeeper.quorum' = '${hbase.zookeeper.quorum}'
);
DML in flink to synchronize data. 
INSERT INTO `hbase_sink1` SELECT
    REVERSE(CONCAT_WS('', CAST(`id` AS VARCHAR))) as `id`,
    ROW(`name`, `age`, `weight`)
FROM `source_sample_1001`;
2、Another flink job sink datagen data to the MySQL table {{source_sample_1001}} 
。id range from 1 to 10_000, that means source_sample_1001 will have at most 
10_000 records。
CREATE TABLE datagen_source (
     `id` int,
     `name` String,
     `age` int,
     `weight` float
) WITH (
  'connector' = 'datagen',
  'fields.id.kind' = 'random',
  'fields.id.min' = '1',
  'fields.id.max' = '10000',
  'fields.name.length' = '20',
  'rows-per-second' = '5000'
);
​
CREATE TABLE `source_sample_1001` (
     `id` bigint,
     `name` String,
     `age` bigint,
     `weight` float,
    PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 
'jdbc:mysql://${ip}:3306/testdb_0010?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',
    'table-name' = 'source_sample_1001',
    'username' = '${user}',
    'password' = '${password}',
    'sink.buffer-flush.max-rows' = '500',
    'sink.buffer-flush.interval' = '1s'
);
​
-- dml 
INSERT INTO `source_sample_1001` SELECT * FROM `datagen_source`;
3、A bash script deletes the MySQL table {{source_sample_1001}} with batch 10.
#!/bin/bash
​
mysql1="mysql -h${ip} -u${user} -p${password}"
batch=10
​
for ((i=1; ;i++)); do
  echo "iteration $i start"
  for ((j=1; j<=10000; j+=10)); do
    $mysql1 -e "delete from testdb_0010.source_sample_1001 where id >= $j and 
id < $((j+10))"
  done
  echo "iteration $i end"
  sleep 10
done
4、Start the above two flink jobs and the bash script. Wait for several minutes, 
usually 5 minutes is enough. Please note that deleting data bash script is 
necessary for reproduce the problem.

5、Stop the bash script, and waiting for MySQL table to fill up with 10_000 data 
by the datagen flink job。And then stop datagen flink job. Waiting for the sink 
hbase job to read all the binlog of MySQL table {{source_sample_1001}}.

6、Check the hbase table and reproduce the issue of data loss. As shown below, 
67 records of data were lost in a test.
hbase(main):006:0> count  'testdb_0011:source_sample_1001'                      
                             
9933 row(s)
Took 0.8724 seconds                                                             
                                                                                
                                        
=> 9933
Find out a missing record of data and check the raw data in HBase.
hbase(main):008:0> get 'testdb_0011:source_sample_1001', '24'
COLUMN                                              CELL                        
                                                                                
                                        
0 row(s)
Took 0.0029 seconds                                                             
                                                                                
                                        
hbase(main):009:0> scan 'testdb_0011:source_sample_1001', \{RAW => true, 
VERSIONS => 1000, STARTROW => '24', STOPROW => '24'}
ROW                                                 COLUMN+CELL                 
                                                                                
                                        
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:44.884, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:44.884, value=3a8f571c25a9d9040ef3                   
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:43.769, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:43.769, value=5aada98281ee0a961841                   
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:42.902, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:42.902, value=599790a9a641e6121ab3                   
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:41.614, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:41.614, value=4ece6410d32959457f80                   
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:40.885, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:40.885, value=9edcfcf1c958a7e4ae2a                   
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:40.841, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:40.841, value=3d82dcf982d5bcd5b6b7                   
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:39.788, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:39.788, value=2888a338b65caaf15b30                   
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:35.799, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:35.799, value=a8d7549e18ef0c0e8674                   
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:35.688, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:35.688, value=ada7237e52d030dcef7a                   
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:35.650, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:35.650, value=482feed26918dcdc911e                   
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:34.885, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:34.885, value=36d6bdd585dbb65dedb7                   
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:33.905, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:33.905, value=6e15c4462f8435040700                   
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:33.803, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:33.803, value=d122df5afd4eac32da72                   
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:33.693, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:33.693, value=ed603d47fedb3852b520                   
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:31.784, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:31.784, value=1ebdd5fe6310850b8098                   
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:30.684, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:30.684, value=cc628ba45d1ad07fce2f                   
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:29.812, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:29.812, value=c1d4df6e987bdb3cd0a3                   
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:29.590, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:29.590, value=535557700ca01c6b6b1e                   
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:28.876, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:28.876, value=a63c2ebfefc82eab4bcf                   
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:28.565, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:28.565, value=dd2b24ff0dfa672c49ba                   
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:27.879, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:27.879, value=69dbe1287c2bc54781ab                   
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:27.699, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:27.699, value=775d06dcbf1148e665ee                   
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:24.209, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:24.209, value=e23c010ab06125c88870                   
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:22.480, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:20.716, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:18.678, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:17.720, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:16.858, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:16.682, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:15.753, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:14.571, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:11.572, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:09.681, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:08.792, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:05.888, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:05.754, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:03.626, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:02.652, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:01.790, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:17:00.986, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:16:59.797, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:16:58.982, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:16:58.781, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:16:58.626, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:16:58.149, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:16:56.610, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:16:51.655, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:16:51.458, type=Delete                                  
                                                  
 24                                                 column=data:name, 
timestamp=2023-05-20T21:16:44.860, type=Delete                                  
                                                  
1 row(s)
Took 0.1466 seconds                                                             
                     
7、Start the bash script to delete all data of the MySQL table. Waiting for the 
sink hbase job to read all the binlog of MySQL table {{source_sample_1001}}.

6、Check the hbase table and reproduce the issue of data no delete. As shown 
below, 6 records of data were not deleted in the test.
hbase(main):012:0> count  'testdb_0011:source_sample_1001'
6 row(s)
Took 0.5121 seconds                                                             
                                                                                
                                        
=> 6
Check the raw data of a record in HBase.
hbase(main):013:0> get 'testdb_0011:source_sample_1001', '3668'
COLUMN                                              CELL                        
                                                                                
                                        
 data:name                                          
timestamp=2023-05-20T21:17:26.714, value=ebb15f905622340d0351                   
                                                                    
1 row(s)
Took 0.0037 seconds                                                             
                                                                                
                                        
hbase(main):014:0> scan 'testdb_0011:source_sample_1001', \{RAW => true, 
VERSIONS => 1000, STARTROW => '3668', STOPROW => '3668'}
ROW                                                 COLUMN+CELL                 
                                                                                
                                        
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:45.728, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:45.728, value=c675a12c7cbed27599c3                   
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:44.693, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:44.693, value=413921aa1ac44f545954                   
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:43.854, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:43.854, value=7d44b0efc0923e4035b7                   
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:41.721, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:41.721, value=60bfaef81bf8efdf781a                   
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:40.763, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:40.763, value=2c371f9cd3909dd3b3f8                   
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:37.872, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:37.872, value=9e32087cb39065976e50                   
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:32.573, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:32.573, value=708364bf84dad4a04170                   
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:26.811, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:26.811, value=c0e8e11eed3f8410dea9                   
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:26.714, value=ebb15f905622340d0351                   
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:24.310, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:24.310, value=21681a161ed2ccbe884e                   
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:23.508, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:23.508, value=a1ef547a9efd57a7a0e2                   
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:22.788, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:22.788, value=34e688060e6c40f4f83b                   
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:21.746, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:17.761, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:12.610, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:11.909, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:07.846, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:06.901, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:06.758, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:06.569, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:02.689, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:17:00.344, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:16:59.961, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:16:59.415, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:16:58.916, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:16:58.781, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:16:58.718, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:16:58.339, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:16:56.340, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:16:55.883, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:16:55.683, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:16:55.056, type=Delete                                  
                                                  
 3668                                               column=data:name, 
timestamp=2023-05-20T21:16:46.845, type=Delete                                  
                                                  
1 row(s)
Took 0.0457 seconds                                                             
                              
h4. *Reason for the problem*

The [HBase 
connector|https://github.com/apache/flink/blob/06688f345f6793a8964ec00002175f44cda13c33/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java#L177]
 use the [Delete key 
type|https://github.com/apache/hbase/blob/c05ee564d3026688bcfdc456071059c7c8409694/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java#L380]
 [without 
timestamp|https://github.com/apache/flink/blob/06688f345f6793a8964ec00002175f44cda13c33/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java#L168]
 to {{delete the latest version of the specified column. This is an expensive 
call in that on the server-side, it first does a get to find the latest 
versions timestamp. Then it adds a delete using the fetched cells timestamp}}. 
Causing the following issues:

Problem 1: When writing update data, the timestamp of -U and +U added by the 
hbase server to the update message may be the same, and -U deleted the latest 
version of +U data, resulting in accidental deletion of the data. The problem 
reported by https://issues.apache.org/jira/browse/FLINK-28910

Problem 2: When there are multiple versions of HBase data, deleting the data 
will exposes earlier versions of the data, and resulting in the issue of data 
no delete.
h4. *Solution proposal* 

Use the [DeleteColumn key 
type|https://github.com/apache/hbase/blob/c05ee564d3026688bcfdc456071059c7c8409694/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java#L322]
 and set strongly increasing timestamp for 
[put|https://github.com/lzshlzsh/flink/blob/a2341810a244b97a3af32951e17efbc49f570cdd/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java#L138]
 and 
[delete|https://github.com/lzshlzsh/flink/blob/a2341810a244b97a3af32951e17efbc49f570cdd/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java#L170]
 mutation. The delete mutation will delete all versions of the specified column 
with a timestamp less than or equal to the specified.

I have test the proposed solution for seval days, and neither the data 
accidental deletion nor non deletion issues happen.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to