[ 
https://issues.apache.org/jira/browse/FLINK-32139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725853#comment-17725853
 ] 

LiuZeshan commented on FLINK-32139:
-----------------------------------

By adding logs to HBase, the root cause of unexpected data deletion was found. 
HBase version 2.4.5.

Firstly, writing data to HBase using put, delete, and put with the same 
timestamp results in the data being deleted.

!image-2023-05-24-23-07-23-978.png|width=309,height=100!

Verify as follows:
{code:java}
hbase:012:0> put 'source_sample_1001', 1, 'data:name', 'MyName', 1684937724000
Took 0.0294 seconds
hbase:013:0> scan 'source_sample_1001'
ROW                                         COLUMN+CELL
 1                                          column=data:name, 
timestamp=2023-05-24T22:15:24, value=MyName
1 row(s)
Took 0.0610 seconds
hbase:014:0> delete 'source_sample_1001', 1, 'data:name', 1684937724000
Took 0.0248 seconds
hbase:015:0> scan 'source_sample_1001'
ROW                                         COLUMN+CELL
0 row(s)
Took 0.0101 seconds
hbase:016:0> put 'source_sample_1001', 1, 'data:name', 'MyName', 1684937724000
Took 0.0139 seconds
hbase:017:0> scan 'source_sample_1001'
ROW                                         COLUMN+CELL
0 row(s)
Took 0.0164 seconds {code}
We add debug code in HBase as follows:
{code:java}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index d304fa3c8f..6dd4ef0706 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -3254,12 +3254,15 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
             if (!coprocessorHost.prePrepareTimeStampForDeleteVersion(mutation, 
cell,
                 byteNow, get)) {
               updateDeleteLatestVersionTimestamp(cell, get, count, byteNow);
+              LOG.info("-D[{}][{}]", mutation, cell.getTimestamp());
             }
           } else {
             updateDeleteLatestVersionTimestamp(cell, get, count, byteNow);
+            LOG.info("-D[{}][{}]", mutation, cell.getTimestamp());
           }
         } else {
           PrivateCellUtil.updateLatestStamp(cell, byteNow);
+          LOG.info("-D[{}][{}]", mutation, cell.getTimestamp());
         }
       }
     }
@@ -3874,6 +3877,7 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int 
index) -> {
         Mutation mutation = getMutation(index);
         if (mutation instanceof Put) {
+          LOG.info("+I[{}][{}]", mutation, timestamp);
           HRegion.updateCellTimestamps(familyCellMaps[index].values(), 
Bytes.toBytes(timestamp));
           miniBatchOp.incrementNumOfPuts();
         } else if (mutation instanceof Delete) { {code}
Obtained the detailed logs of the unexpectedly deleted rowKey 041. [^aa.log]

After preprocessing the log data.
{code:java}
+I["ts":"9223372036854775807"}][1684930529299]
-D["ts":"9223372036854775807"}][1684930529299]
+I["ts":"9223372036854775807"}][1684930529646]
-D["ts":"9223372036854775807"}][1684930529646]
+I["ts":"9223372036854775807"}][1684930534294]
-D["ts":"9223372036854775807"}][1684930534294]
+I["ts":"9223372036854775807"}][1684930536493]
-D["ts":"9223372036854775807"}][1684930536493]
+I["ts":"9223372036854775807"}][1684930539063]
-D["ts":"9223372036854775807"}][1684930539063]
+I["ts":"9223372036854775807"}][1684930542714]
-D["ts":"9223372036854775807"}][1684930542714]
+I["ts":"9223372036854775807"}][1684930545573]
-D["ts":"9223372036854775807"}][1684930545573]
+I["ts":"9223372036854775807"}][1684930559779]
-D["ts":"9223372036854775807"}][1684930559779]
+I["ts":"9223372036854775807"}][1684930561168]
-D["ts":"9223372036854775807"}][1684930561168]
+I["ts":"9223372036854775807"}][1684930561168]
-D["ts":"9223372036854775807"}][1684930564624]
+I["ts":"9223372036854775807"}][1684930564624]
-D["ts":"9223372036854775807"}][1684930565135]
+I["ts":"9223372036854775807"}][1684930565135]
-D["ts":"9223372036854775807"}][1684930568112]
+I["ts":"9223372036854775807"}][1684930568112]
-D["ts":"9223372036854775807"}][1684930575200]
+I["ts":"9223372036854775807"}][1684930575200]
-D["ts":"9223372036854775807"}][1684930577432]
+I["ts":"9223372036854775807"}][1684930577432]
-D["ts":"9223372036854775807"}][1684930584437]
+I["ts":"9223372036854775807"}][1684930584437]
-D["ts":"9223372036854775807"}][1684930593148]
+I["ts":"9223372036854775807"}][1684930593148]
-D["ts":"9223372036854775807"}][1684930599743]
+I["ts":"9223372036854775807"}][1684930599743]
-D["ts":"9223372036854775807"}][1684930605087]
+I["ts":"9223372036854775807"}][1684930605087]
-D["ts":"9223372036854775807"}][1684930607430]
+I["ts":"9223372036854775807"}][1684930607430]
-D["ts":"9223372036854775807"}][1684930609912]
+I["ts":"9223372036854775807"}][1684930609912]
-D["ts":"9223372036854775807"}][1684930612221]
+I["ts":"9223372036854775807"}][1684930612221]
-D["ts":"9223372036854775807"}][1684930613310]
+I["ts":"9223372036854775807"}][1684930613310]
-D["ts":"9223372036854775807"}][1684930616447]
+I["ts":"9223372036854775807"}][1684930616447]
-D["ts":"9223372036854775807"}][1684930621082]
+I["ts":"9223372036854775807"}][1684930621082]
-D["ts":"9223372036854775807"}][1684930642614]
+I["ts":"9223372036854775807"}][1684930642614] {code}
The HBase RAW data for the unexpectedly deleted rowKey is as follows (obtained 
from the scan command):
{code:java}
 041                                             column=data:name, 
timestamp=2023-05-24T20:17:22.614, type=Delete
 041                                             column=data:name, 
timestamp=2023-05-24T20:17:22.614, value=7a6820768c7af73ee097
 041                                             column=data:name, 
timestamp=2023-05-24T20:17:01.082, type=Delete
 041                                             column=data:name, 
timestamp=2023-05-24T20:17:01.082, value=2b787b3910a36d3c04f5
 041                                             column=data:name, 
timestamp=2023-05-24T20:16:56.447, type=Delete
 041                                             column=data:name, 
timestamp=2023-05-24T20:16:56.447, value=999115f88f5fe66b6cca
 041                                             column=data:name, 
timestamp=2023-05-24T20:16:53.310, type=Delete
 041                                             column=data:name, 
timestamp=2023-05-24T20:16:53.310, value=0302da679440e7743e4a
 041                                             column=data:name, 
timestamp=2023-05-24T20:16:52.221, type=Delete
 041                                             column=data:name, 
timestamp=2023-05-24T20:16:52.221, value=352b9c11acd86548c126
 041                                             column=data:name, 
timestamp=2023-05-24T20:16:49.912, type=Delete
 041                                             column=data:name, 
timestamp=2023-05-24T20:16:49.912, value=b254fef72188d54908ae
 041                                             column=data:name, 
timestamp=2023-05-24T20:16:47.430, type=Delete
 041                                             column=data:name, 
timestamp=2023-05-24T20:16:47.430, value=9c0d6d0faa218ab2acd9
 041                                             column=data:name, 
timestamp=2023-05-24T20:16:45.087, type=Delete
 041                                             column=data:name, 
timestamp=2023-05-24T20:16:45.087, value=73a40d2892839f657579
 041                                             column=data:name, 
timestamp=2023-05-24T20:16:39.743, type=Delete
 041                                             column=data:name, 
timestamp=2023-05-24T20:16:33.148, type=Delete
 041                                             column=data:name, 
timestamp=2023-05-24T20:16:24.437, type=Delete
 041                                             column=data:name, 
timestamp=2023-05-24T20:16:17.432, type=Delete
 041                                             column=data:name, 
timestamp=2023-05-24T20:16:15.200, type=Delete
 041                                             column=data:name, 
timestamp=2023-05-24T20:16:08.112, type=Delete
 041                                             column=data:name, 
timestamp=2023-05-24T20:16:05.135, type=Delete
 041                                             column=data:name, 
timestamp=2023-05-24T20:16:04.624, type=Delete
 041                                             column=data:name, 
timestamp=2023-05-24T20:16:01.168, type=Delete
 041                                             column=data:name, 
timestamp=2023-05-24T20:15:59.779, type=Delete
 041                                             column=data:name, 
timestamp=2023-05-24T20:15:45.573, type=Delete
 041                                             column=data:name, 
timestamp=2023-05-24T20:15:42.714, type=Delete
 041                                             column=data:name, 
timestamp=2023-05-24T20:15:39.063, type=Delete
 041                                             column=data:name, 
timestamp=2023-05-24T20:15:36.493, type=Delete
 041                                             column=data:name, 
timestamp=2023-05-24T20:15:34.294, type=Delete
 041                                             column=data:name, 
timestamp=2023-05-24T20:15:29.646, type=Delete
 041                                             column=data:name, 
timestamp=2023-05-24T20:15:29.299, type=Delete

# translate timestamp to microsecond
1684930642614 type=Delete
1684930642614 value=7a6820768c7af73ee097
1684930621082 type=Delete
1684930621082 value=2b787b3910a36d3c04f5
1684930616447 type=Delete
1684930616447 value=999115f88f5fe66b6cca
1684930613310 type=Delete
1684930613310 value=0302da679440e7743e4a
1684930612221 type=Delete
1684930612221 value=352b9c11acd86548c126
1684930609912 type=Delete
1684930609912 value=b254fef72188d54908ae
1684930607430 type=Delete
1684930607430 value=9c0d6d0faa218ab2acd9
1684930605087 type=Delete
1684930605087 value=73a40d2892839f657579
1684930599743 type=Delete
1684930593148 type=Delete
1684930584437 type=Delete
1684930577432 type=Delete
1684930575200 type=Delete
1684930568112 type=Delete
1684930565135 type=Delete
1684930564624 type=Delete
1684930561168 type=Delete
1684930559779 type=Delete
1684930545573 type=Delete
1684930542714 type=Delete
1684930539063 type=Delete
1684930536493 type=Delete
1684930534294 type=Delete
1684930529646 type=Delete
1684930529299 type=Delete {code}
We know that the delete key type of Delete mutation with timestamp is used to 
delete the data of the specified timestamp. Analyzing the above two datas, 
there is no obvious evidence from the scan's RAW data.

We need to analyze the HBase log data. As shown in the following image, the 
first row of data is the snapshot data in MySQL. The following -D/+I pairs are 
MySQL's UPDATE (or DELETE/INSERT) events, and for each UPDATE event, the 
timestamp of -D is the same as the timestamp of the previous +I, that is, 
delete the data since the last update, and then insert the data of the current 
timestamp. If the timestamp of this update is the same as the timestamp of the 
previous update (lines 17, 18, and 19 in the figure below), the final effect is 
accidental deletion of the data. Afterwards, all UPDATE's -D/+I have the same 
timestamp, resulting in the data being deleted.

!image-2023-05-24-23-16-59-508.png|width=756,height=435!

> Data accidentally deleted and not deleted when upsert sink to hbase
> -------------------------------------------------------------------
>
>                 Key: FLINK-32139
>                 URL: https://issues.apache.org/jira/browse/FLINK-32139
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / HBase
>            Reporter: LiuZeshan
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: aa.log, image-2023-05-24-23-07-23-978.png, 
> image-2023-05-24-23-16-59-508.png
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> h4. *Problem background*
> We meet data accidental deletion and non deletion issues when synchronizing 
> MySQL cdc data to HBase using HBase connectors.
> h3. Reproduction steps
> 1、The Flink job with 1 parallelism synchronize a MySQL table into HBase. 
> SinkUpsertMaterializer is tunned off by setting 
> {{{}table.exec.sink.upsert-materialize = 'NONE'{}}}。
> MySQL table schema is as follows。
> {code:java}
> CREATE TABLE `source_sample_1001` (
> `id` int(11) NOT NULL AUTO_INCREMENT,
> `name` varchar(200) DEFAULT NULL,
> `age` int(11) DEFAULT NULL,
> `weight` float DEFAULT NULL,
> PRIMARY KEY (`id`)
> );{code}
> The source table definition in Flink is as follows.
> {code:java}
> CREATE TABLE `source_sample_1001` (
>     `id` bigint,
>     `name` String,
>     `age` bigint,
>     `weight` float,
>   PRIMARY KEY (`id`) NOT ENFORCED
> ) WITH (
> 'connector' = 'mysql-cdc' ,
> 'hostname' = '${ip}',
> 'port' = '3306',
> 'username' = '${user}',
> 'password' = '${password}',
> 'database-name' = 'testdb_0010',
> 'table-name' = 'source_sample_1001'
> );{code}
> HBase sink table are created in {{testdb_0011}} namespace.
> {code:java}
> CREATE 'testdb_0011:source_sample_1001', 'data'
> ​
> describe 'testdb_0011:source_sample_1001'
>  
> # describe output
> Table testdb_0011:source_sample_1001 is ENABLED                               
>                                                                               
>                                             
> testdb_0011:source_sample_1001                                                
>                                                                               
>                                             
> COLUMN FAMILIES DESCRIPTION                                                   
>                                                                               
>                                             {NAME => 'data', BLOOMFILTER => 
> 'ROW', IN_MEMORY => 'false', VERSIONS => '1', KEEP_DELETED_CELLS => 'FALSE', 
> DATA_BLOCK_ENCODING => 'NONE', COMPRESSION => 'NONE', TTL => 'FOREVER', 
> MIN_VERSIONS => '0' , BLOCKCACHE => 'true', BLOCKSIZE => '65536', 
> REPLICATION_SCOPE => '0'}
> {code}
>  
>                                                                               
>                                                   
> The sink table definition in Flink.
> {code:java}
> CREATE TABLE `hbase_sink1` (
>     `id` STRING COMMENT 'unique id',
>     `data` ROW<
>         `name` string,
>         `age` string,
>         `weight` string
>     >,
>     primary key(`id`) not enforced
> ) WITH (
>   'connector' = 'hbase-2.2',
>   'table-name' = 'testdb_0011:source_sample_1001',
>   'zookeeper.quorum' = '${hbase.zookeeper.quorum}'
> );{code}
> DML in flink to synchronize data.
> {code:java}
> INSERT INTO `hbase_sink1` SELECT
>     `id`, row(`name`, `age`, `weight`)
> FROM (
>     SELECT
>         REVERSE(CONCAT_WS('', CAST(id AS VARCHAR ))) as id,
>         `name`, cast(`age` as varchar) as `age`, cast(`weight` as varchar) as 
> `weight`
>     FROM `source_sample_1001`
> ) t;{code}
> 2、Another flink job sinks datagen data to the MySQL table 
> {{source_sample_1001}} 。id range from 1 to 10_000, that means 
> source_sample_1001 will have at most 10_000 records。
> {code:java}
> CREATE TABLE datagen_source (
>     `id` int,
>     `name` String,
>     `age` int,
>     `weight` int
> ) WITH (
>    'connector' = 'datagen',
>   'fields.id.kind' = 'random',
>   'fields.id.min' = '1',
>   'fields.id.max' = '10000',
>   'fields.name.length' = '20',
>   'fields.age.min' = '1',
>   'fields.age.max' = '150',
>   'fields.weight.min' = '5',
>   'fields.weight.max' = '300',
>   'rows-per-second' = '5000'
> );
> ​
> CREATE TABLE `source_sample_1001` (
>     `id` bigint,
>     `name` String,
>     `age` bigint,
>     `weight` float,
>   PRIMARY KEY (`id`) NOT ENFORCED
> ) WITH (
>   'connector' = 'jdbc',
>   'url' = 
> 'jdbc:mysql://${ip}:3306/testdb_0010?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',
>   'table-name' = 'source_sample_1001',
>   'username' = '${user}',
>   'password' = '${password}',
>   'sink.buffer-flush.max-rows' = '500',
>   'sink.buffer-flush.interval' = '1s'
> );
> ​
> -- dml
> INSERT INTO `source_sample_1001` SELECT `id`, `name`, `age`, cast(`weight` as 
> float) FROM `datagen_source`;{code}
> 3、A bash script deletes the MySQL table {{source_sample_1001}} with batch 10.
> {code:java}
> #!/bin/bash
> ​
> mysql1="mysql -h${ip} -u${user} -p${password}"
> batch=10
> ​
> for ((i=1; ;i++)); do
> echo "iteration $i start"
> for ((j=1; j<=10000; j+=10)); do
>   $mysql1 -e "delete from testdb_0010.source_sample_1001 where id >= $j and 
> id < $((j+10))"
> done
> echo "iteration $i end"
> sleep 10
> done{code}
> 4、Start the above two flink jobs and the bash script. Wait for several 
> minutes, usually 5 minutes is enough. Please note that deleting data bash 
> script is necessary for reproduce the problem.
> 5、Stop the bash script, and waiting for MySQL table to fill up with 10_000 
> data by the datagen flink job。And then stop datagen flink job. Waiting for 
> the sink hbase job to read all the binlog of MySQL table 
> {{{}source_sample_1001{}}}.
> 6、Check the hbase table and reproduce the issue of data loss. As shown below, 
> 67 records were lost in a test.
> {code:java}
> hbase(main):006:0> count 'testdb_0011:source_sample_1001'                     
>                               
> 9933 row(s)
> Took 0.8724 seconds                                                           
>                                                                               
>                                             
> => 9933{code}
> Find out a missing record and check the raw data in HBase.
> {code:java}
> hbase(main):008:0> get 'testdb_0011:source_sample_1001', '24'
> COLUMN                                             CELL                       
>                                                                               
>                                            
> 0 row(s)
> Took 0.0029 seconds                                                           
>                                                                               
>                                             
> hbase(main):009:0> scan 'testdb_0011:source_sample_1001', {RAW => true, 
> VERSIONS => 1000, STARTROW => '24', STOPROW => '24'}
> ROW                                                 COLUMN+CELL               
>                                                                               
>                                             
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:44.884, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:44.884, value=3a8f571c25a9d9040ef3                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:43.769, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:43.769, value=5aada98281ee0a961841                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:42.902, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:42.902, value=599790a9a641e6121ab3                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:41.614, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:41.614, value=4ece6410d32959457f80                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:40.885, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:40.885, value=9edcfcf1c958a7e4ae2a                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:40.841, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:40.841, value=3d82dcf982d5bcd5b6b7                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:39.788, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:39.788, value=2888a338b65caaf15b30                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:35.799, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:35.799, value=a8d7549e18ef0c0e8674                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:35.688, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:35.688, value=ada7237e52d030dcef7a                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:35.650, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:35.650, value=482feed26918dcdc911e                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:34.885, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:34.885, value=36d6bdd585dbb65dedb7                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:33.905, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:33.905, value=6e15c4462f8435040700                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:33.803, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:33.803, value=d122df5afd4eac32da72                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:33.693, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:33.693, value=ed603d47fedb3852b520                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:31.784, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:31.784, value=1ebdd5fe6310850b8098                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:30.684, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:30.684, value=cc628ba45d1ad07fce2f                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:29.812, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:29.812, value=c1d4df6e987bdb3cd0a3                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:29.590, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:29.590, value=535557700ca01c6b6b1e                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:28.876, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:28.876, value=a63c2ebfefc82eab4bcf                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:28.565, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:28.565, value=dd2b24ff0dfa672c49ba                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:27.879, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:27.879, value=69dbe1287c2bc54781ab                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:27.699, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:27.699, value=775d06dcbf1148e665ee                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:24.209, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:24.209, value=e23c010ab06125c88870                 
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:22.480, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:20.716, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:18.678, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:17.720, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:16.858, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:16.682, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:15.753, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:14.571, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:11.572, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:09.681, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:08.792, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:05.888, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:05.754, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:03.626, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:02.652, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:01.790, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:17:00.986, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:16:59.797, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:16:58.982, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:16:58.781, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:16:58.626, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:16:58.149, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:16:56.610, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:16:51.655, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:16:51.458, type=Delete                                
>                                                     
> 24                                                 column=data:name, 
> timestamp=2023-05-20T21:16:44.860, type=Delete                                
>                                                     
> 1 row(s)
> Took 0.1466 seconds                                                           
>                        {code}
> 7、Start the bash script to delete all data of the MySQL table. Waiting for 
> the sink hbase job to read all the binlog of MySQL table 
> {{{}source_sample_1001{}}}.
> 6、Check the hbase table and reproduce the issue of data no deletion. As shown 
> below, 6 records were not deleted in the test.
> {code:java}
> hbase(main):012:0> count 'testdb_0011:source_sample_1001'
> 6 row(s)
> Took 0.5121 seconds                                                           
>                                                                               
>                                             
> => 6{code}
> Check the raw data of a record in HBase.
> {code:java}
> hbase(main):013:0> get 'testdb_0011:source_sample_1001', '3668'
> COLUMN                                             CELL                       
>                                                                               
>                                            
> data:name                                         
> timestamp=2023-05-20T21:17:26.714, value=ebb15f905622340d0351                 
>                                                                       
> 1 row(s)
> Took 0.0037 seconds                                                           
>                                                                               
>                                             
> hbase(main):014:0> scan 'testdb_0011:source_sample_1001', {RAW => true, 
> VERSIONS => 1000, STARTROW => '3668', STOPROW => '3668'}
> ROW                                                 COLUMN+CELL               
>                                                                               
>                                             
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:45.728, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:45.728, value=c675a12c7cbed27599c3                 
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:44.693, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:44.693, value=413921aa1ac44f545954                 
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:43.854, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:43.854, value=7d44b0efc0923e4035b7                 
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:41.721, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:41.721, value=60bfaef81bf8efdf781a                 
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:40.763, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:40.763, value=2c371f9cd3909dd3b3f8                 
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:37.872, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:37.872, value=9e32087cb39065976e50                 
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:32.573, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:32.573, value=708364bf84dad4a04170                 
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:26.811, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:26.811, value=c0e8e11eed3f8410dea9                 
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:26.714, value=ebb15f905622340d0351                 
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:24.310, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:24.310, value=21681a161ed2ccbe884e                 
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:23.508, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:23.508, value=a1ef547a9efd57a7a0e2                 
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:22.788, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:22.788, value=34e688060e6c40f4f83b                 
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:21.746, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:17.761, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:12.610, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:11.909, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:07.846, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:06.901, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:06.758, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:06.569, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:02.689, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:17:00.344, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:16:59.961, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:16:59.415, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:16:58.916, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:16:58.781, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:16:58.718, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:16:58.339, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:16:56.340, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:16:55.883, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:16:55.683, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:16:55.056, type=Delete                                
>                                                     
> 3668                                               column=data:name, 
> timestamp=2023-05-20T21:16:46.845, type=Delete                                
>                                                     
> 1 row(s)
> Took 0.0457 seconds                                                           
>                                {code}
> h4. *Reason for the problem*
> The [HBase 
> connector|https://github.com/apache/flink/blob/06688f345f6793a8964ec00002175f44cda13c33/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java#L177]
>  use the [Delete key 
> type|https://github.com/apache/hbase/blob/c05ee564d3026688bcfdc456071059c7c8409694/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java#L380]
>  [without 
> timestamp|https://github.com/apache/flink/blob/06688f345f6793a8964ec00002175f44cda13c33/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java#L168]
>  to {{{}delete the latest version of the specified column. This is an 
> expensive call in that on the server-side, it first does a get to find the 
> latest versions timestamp. Then it adds a delete using the fetched cells 
> timestamp{}}}. Causing the following issues:
> Problem 1: When writing update data, the timestamp of -U and +U added by the 
> hbase server to the update message may be the same, and -U deleted the latest 
> version of +U data, resulting in accidental deletion of the data. The problem 
> was also reported by https://issues.apache.org/jira/browse/FLINK-28910
> Problem 2: When there are multiple versions of HBase data, deleting the data 
> will exposes earlier versions of the data, and resulting in the issue of data 
> no deletion.
> h4. *Solution proposal* 
> Use the [DeleteColumn key 
> type|https://github.com/apache/hbase/blob/c05ee564d3026688bcfdc456071059c7c8409694/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java#L322]
>  and set strongly increasing timestamp for 
> [put|https://github.com/lzshlzsh/flink/blob/a2341810a244b97a3af32951e17efbc49f570cdd/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java#L138]
>  and 
> [delete|https://github.com/lzshlzsh/flink/blob/a2341810a244b97a3af32951e17efbc49f570cdd/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java#L170]
>  mutation. The delete mutation will delete all versions of the specified 
> column with a timestamp less than or equal to the specified.
> I have test the proposed solution for several days, and neither the data 
> accidental deletion nor no deletion issues happen.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to