[ https://issues.apache.org/jira/browse/FLINK-32139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17724739#comment-17724739 ]
LiuZeshan commented on FLINK-32139: ----------------------------------- [~liyu] Would you also help to take a look at this issue? > Data accidentally deleted and not deleted when upsert sink to hbase > ------------------------------------------------------------------- > > Key: FLINK-32139 > URL: https://issues.apache.org/jira/browse/FLINK-32139 > Project: Flink > Issue Type: Bug > Components: Connectors / HBase > Reporter: LiuZeshan > Priority: Major > Labels: pull-request-available > Original Estimate: 24h > Remaining Estimate: 24h > > h4. *Problem background* > We meet data accidental deletion and non deletion issues when synchronizing > MySQL cdc data to HBase using HBase connectors. > h3. Reproduction steps > 1、The Flink job with 1 parallelism synchronize a MySQL table into HBase. > SinkUpsertMaterializer is tunned off by setting > {{{}table.exec.sink.upsert-materialize = 'NONE'{}}}。 > MySQL table schema is as follows。 > {code:java} > CREATE TABLE `source_sample_1001` ( > `id` int(11) NOT NULL AUTO_INCREMENT, > `name` varchar(200) DEFAULT NULL, > `age` int(11) DEFAULT NULL, > `weight` float DEFAULT NULL, > PRIMARY KEY (`id`) > );{code} > The source table definition in Flink is as follows. > {code:java} > CREATE TABLE `source_sample_1001` ( > `id` bigint, > `name` String, > `age` bigint, > `weight` float, > PRIMARY KEY (`id`) NOT ENFORCED > ) WITH ( > 'connector' = 'mysql-cdc' , > 'hostname' = '${ip}', > 'port' = '3306', > 'username' = '${user}', > 'password' = '${password}', > 'database-name' = 'testdb_0010', > 'table-name' = 'source_sample_1001' > );{code} > HBase sink table are created in {{testdb_0011}} namespace. > {code:java} > CREATE 'testdb_0011:source_sample_1001', 'data' > > describe 'testdb_0011:source_sample_1001' > > # describe output > Table testdb_0011:source_sample_1001 is ENABLED > > > testdb_0011:source_sample_1001 > > > COLUMN FAMILIES DESCRIPTION > > {NAME => 'data', BLOOMFILTER => > 'ROW', IN_MEMORY => 'false', VERSIONS => '1', KEEP_DELETED_CELLS => 'FALSE', > DATA_BLOCK_ENCODING => 'NONE', COMPRESSION => 'NONE', TTL => 'FOREVER', > MIN_VERSIONS => '0' , BLOCKCACHE => 'true', BLOCKSIZE => '65536', > REPLICATION_SCOPE => '0'} > {code} > > > > The sink table definition in Flink. > {code:java} > CREATE TABLE `hbase_sink1` ( > `id` STRING COMMENT 'unique id', > `data` ROW< > `name` string, > `age` string, > `weight` string > >, > primary key(`id`) not enforced > ) WITH ( > 'connector' = 'hbase-2.2', > 'table-name' = 'testdb_0011:source_sample_1001', > 'zookeeper.quorum' = '${hbase.zookeeper.quorum}' > );{code} > DML in flink to synchronize data. > {code:java} > INSERT INTO `hbase_sink1` SELECT > `id`, row(`name`, `age`, `weight`) > FROM ( > SELECT > REVERSE(CONCAT_WS('', CAST(id AS VARCHAR ))) as id, > `name`, cast(`age` as varchar) as `age`, cast(`weight` as varchar) as > `weight` > FROM `source_sample_1001` > ) t;{code} > 2、Another flink job sinks datagen data to the MySQL table > {{source_sample_1001}} 。id range from 1 to 10_000, that means > source_sample_1001 will have at most 10_000 records。 > {code:java} > CREATE TABLE datagen_source ( > `id` int, > `name` String, > `age` int, > `weight` int > ) WITH ( > 'connector' = 'datagen', > 'fields.id.kind' = 'random', > 'fields.id.min' = '1', > 'fields.id.max' = '10000', > 'fields.name.length' = '20', > 'fields.age.min' = '1', > 'fields.age.max' = '150', > 'fields.weight.min' = '5', > 'fields.weight.max' = '300', > 'rows-per-second' = '5000' > ); > > CREATE TABLE `source_sample_1001` ( > `id` bigint, > `name` String, > `age` bigint, > `weight` float, > PRIMARY KEY (`id`) NOT ENFORCED > ) WITH ( > 'connector' = 'jdbc', > 'url' = > 'jdbc:mysql://${ip}:3306/testdb_0010?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', > 'table-name' = 'source_sample_1001', > 'username' = '${user}', > 'password' = '${password}', > 'sink.buffer-flush.max-rows' = '500', > 'sink.buffer-flush.interval' = '1s' > ); > > -- dml > INSERT INTO `source_sample_1001` SELECT `id`, `name`, `age`, cast(`weight` as > float) FROM `datagen_source`;{code} > 3、A bash script deletes the MySQL table {{source_sample_1001}} with batch 10. > {code:java} > #!/bin/bash > > mysql1="mysql -h${ip} -u${user} -p${password}" > batch=10 > > for ((i=1; ;i++)); do > echo "iteration $i start" > for ((j=1; j<=10000; j+=10)); do > $mysql1 -e "delete from testdb_0010.source_sample_1001 where id >= $j and > id < $((j+10))" > done > echo "iteration $i end" > sleep 10 > done{code} > 4、Start the above two flink jobs and the bash script. Wait for several > minutes, usually 5 minutes is enough. Please note that deleting data bash > script is necessary for reproduce the problem. > 5、Stop the bash script, and waiting for MySQL table to fill up with 10_000 > data by the datagen flink job。And then stop datagen flink job. Waiting for > the sink hbase job to read all the binlog of MySQL table > {{{}source_sample_1001{}}}. > 6、Check the hbase table and reproduce the issue of data loss. As shown below, > 67 records were lost in a test. > {code:java} > hbase(main):006:0> count 'testdb_0011:source_sample_1001' > > 9933 row(s) > Took 0.8724 seconds > > > => 9933{code} > Find out a missing record and check the raw data in HBase. > {code:java} > hbase(main):008:0> get 'testdb_0011:source_sample_1001', '24' > COLUMN CELL > > > 0 row(s) > Took 0.0029 seconds > > > hbase(main):009:0> scan 'testdb_0011:source_sample_1001', {RAW => true, > VERSIONS => 1000, STARTROW => '24', STOPROW => '24'} > ROW COLUMN+CELL > > > 24 column=data:name, > timestamp=2023-05-20T21:17:44.884, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:44.884, value=3a8f571c25a9d9040ef3 > > 24 column=data:name, > timestamp=2023-05-20T21:17:43.769, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:43.769, value=5aada98281ee0a961841 > > 24 column=data:name, > timestamp=2023-05-20T21:17:42.902, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:42.902, value=599790a9a641e6121ab3 > > 24 column=data:name, > timestamp=2023-05-20T21:17:41.614, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:41.614, value=4ece6410d32959457f80 > > 24 column=data:name, > timestamp=2023-05-20T21:17:40.885, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:40.885, value=9edcfcf1c958a7e4ae2a > > 24 column=data:name, > timestamp=2023-05-20T21:17:40.841, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:40.841, value=3d82dcf982d5bcd5b6b7 > > 24 column=data:name, > timestamp=2023-05-20T21:17:39.788, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:39.788, value=2888a338b65caaf15b30 > > 24 column=data:name, > timestamp=2023-05-20T21:17:35.799, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:35.799, value=a8d7549e18ef0c0e8674 > > 24 column=data:name, > timestamp=2023-05-20T21:17:35.688, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:35.688, value=ada7237e52d030dcef7a > > 24 column=data:name, > timestamp=2023-05-20T21:17:35.650, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:35.650, value=482feed26918dcdc911e > > 24 column=data:name, > timestamp=2023-05-20T21:17:34.885, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:34.885, value=36d6bdd585dbb65dedb7 > > 24 column=data:name, > timestamp=2023-05-20T21:17:33.905, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:33.905, value=6e15c4462f8435040700 > > 24 column=data:name, > timestamp=2023-05-20T21:17:33.803, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:33.803, value=d122df5afd4eac32da72 > > 24 column=data:name, > timestamp=2023-05-20T21:17:33.693, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:33.693, value=ed603d47fedb3852b520 > > 24 column=data:name, > timestamp=2023-05-20T21:17:31.784, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:31.784, value=1ebdd5fe6310850b8098 > > 24 column=data:name, > timestamp=2023-05-20T21:17:30.684, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:30.684, value=cc628ba45d1ad07fce2f > > 24 column=data:name, > timestamp=2023-05-20T21:17:29.812, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:29.812, value=c1d4df6e987bdb3cd0a3 > > 24 column=data:name, > timestamp=2023-05-20T21:17:29.590, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:29.590, value=535557700ca01c6b6b1e > > 24 column=data:name, > timestamp=2023-05-20T21:17:28.876, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:28.876, value=a63c2ebfefc82eab4bcf > > 24 column=data:name, > timestamp=2023-05-20T21:17:28.565, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:28.565, value=dd2b24ff0dfa672c49ba > > 24 column=data:name, > timestamp=2023-05-20T21:17:27.879, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:27.879, value=69dbe1287c2bc54781ab > > 24 column=data:name, > timestamp=2023-05-20T21:17:27.699, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:27.699, value=775d06dcbf1148e665ee > > 24 column=data:name, > timestamp=2023-05-20T21:17:24.209, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:24.209, value=e23c010ab06125c88870 > > 24 column=data:name, > timestamp=2023-05-20T21:17:22.480, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:20.716, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:18.678, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:17.720, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:16.858, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:16.682, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:15.753, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:14.571, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:11.572, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:09.681, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:08.792, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:05.888, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:05.754, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:03.626, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:02.652, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:01.790, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:17:00.986, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:16:59.797, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:16:58.982, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:16:58.781, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:16:58.626, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:16:58.149, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:16:56.610, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:16:51.655, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:16:51.458, type=Delete > > 24 column=data:name, > timestamp=2023-05-20T21:16:44.860, type=Delete > > 1 row(s) > Took 0.1466 seconds > {code} > 7、Start the bash script to delete all data of the MySQL table. Waiting for > the sink hbase job to read all the binlog of MySQL table > {{{}source_sample_1001{}}}. > 6、Check the hbase table and reproduce the issue of data no deletion. As shown > below, 6 records were not deleted in the test. > {code:java} > hbase(main):012:0> count 'testdb_0011:source_sample_1001' > 6 row(s) > Took 0.5121 seconds > > > => 6{code} > Check the raw data of a record in HBase. > {code:java} > hbase(main):013:0> get 'testdb_0011:source_sample_1001', '3668' > COLUMN CELL > > > data:name > timestamp=2023-05-20T21:17:26.714, value=ebb15f905622340d0351 > > 1 row(s) > Took 0.0037 seconds > > > hbase(main):014:0> scan 'testdb_0011:source_sample_1001', {RAW => true, > VERSIONS => 1000, STARTROW => '3668', STOPROW => '3668'} > ROW COLUMN+CELL > > > 3668 column=data:name, > timestamp=2023-05-20T21:17:45.728, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:17:45.728, value=c675a12c7cbed27599c3 > > 3668 column=data:name, > timestamp=2023-05-20T21:17:44.693, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:17:44.693, value=413921aa1ac44f545954 > > 3668 column=data:name, > timestamp=2023-05-20T21:17:43.854, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:17:43.854, value=7d44b0efc0923e4035b7 > > 3668 column=data:name, > timestamp=2023-05-20T21:17:41.721, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:17:41.721, value=60bfaef81bf8efdf781a > > 3668 column=data:name, > timestamp=2023-05-20T21:17:40.763, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:17:40.763, value=2c371f9cd3909dd3b3f8 > > 3668 column=data:name, > timestamp=2023-05-20T21:17:37.872, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:17:37.872, value=9e32087cb39065976e50 > > 3668 column=data:name, > timestamp=2023-05-20T21:17:32.573, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:17:32.573, value=708364bf84dad4a04170 > > 3668 column=data:name, > timestamp=2023-05-20T21:17:26.811, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:17:26.811, value=c0e8e11eed3f8410dea9 > > 3668 column=data:name, > timestamp=2023-05-20T21:17:26.714, value=ebb15f905622340d0351 > > 3668 column=data:name, > timestamp=2023-05-20T21:17:24.310, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:17:24.310, value=21681a161ed2ccbe884e > > 3668 column=data:name, > timestamp=2023-05-20T21:17:23.508, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:17:23.508, value=a1ef547a9efd57a7a0e2 > > 3668 column=data:name, > timestamp=2023-05-20T21:17:22.788, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:17:22.788, value=34e688060e6c40f4f83b > > 3668 column=data:name, > timestamp=2023-05-20T21:17:21.746, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:17:17.761, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:17:12.610, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:17:11.909, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:17:07.846, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:17:06.901, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:17:06.758, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:17:06.569, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:17:02.689, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:17:00.344, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:16:59.961, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:16:59.415, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:16:58.916, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:16:58.781, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:16:58.718, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:16:58.339, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:16:56.340, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:16:55.883, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:16:55.683, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:16:55.056, type=Delete > > 3668 column=data:name, > timestamp=2023-05-20T21:16:46.845, type=Delete > > 1 row(s) > Took 0.0457 seconds > {code} > h4. *Reason for the problem* > The [HBase > connector|https://github.com/apache/flink/blob/06688f345f6793a8964ec00002175f44cda13c33/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java#L177] > use the [Delete key > type|https://github.com/apache/hbase/blob/c05ee564d3026688bcfdc456071059c7c8409694/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java#L380] > [without > timestamp|https://github.com/apache/flink/blob/06688f345f6793a8964ec00002175f44cda13c33/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java#L168] > to {{{}delete the latest version of the specified column. This is an > expensive call in that on the server-side, it first does a get to find the > latest versions timestamp. Then it adds a delete using the fetched cells > timestamp{}}}. Causing the following issues: > Problem 1: When writing update data, the timestamp of -U and +U added by the > hbase server to the update message may be the same, and -U deleted the latest > version of +U data, resulting in accidental deletion of the data. The problem > was also reported by https://issues.apache.org/jira/browse/FLINK-28910 > Problem 2: When there are multiple versions of HBase data, deleting the data > will exposes earlier versions of the data, and resulting in the issue of data > no deletion. > h4. *Solution proposal* > Use the [DeleteColumn key > type|https://github.com/apache/hbase/blob/c05ee564d3026688bcfdc456071059c7c8409694/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java#L322] > and set strongly increasing timestamp for > [put|https://github.com/lzshlzsh/flink/blob/a2341810a244b97a3af32951e17efbc49f570cdd/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java#L138] > and > [delete|https://github.com/lzshlzsh/flink/blob/a2341810a244b97a3af32951e17efbc49f570cdd/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java#L170] > mutation. The delete mutation will delete all versions of the specified > column with a timestamp less than or equal to the specified. > I have test the proposed solution for several days, and neither the data > accidental deletion nor no deletion issues happen. -- This message was sent by Atlassian Jira (v8.20.10#820010)