xuzhiwen1255 opened a new issue, #6531:
URL: https://github.com/apache/iceberg/issues/6531
### Apache Iceberg version
1.1.0 (latest release)
### Query engine
Flink
### Please describe the bug 🐞
After upsert is enabled on my flink table, I write 300w data in advance and
read the table in stream read mode. As a result, all checkpoint entries fail.
After all data in the table is read,checkpoint works properly
According to my analysis, the upstream monitor sent splits and blocked the
pipeline, so the reader operator could not get the checkpoint barrier at the
first time. As a result, checkpoint could not start, and finally, checkpoint
timeout problem
Like the following :
--------------------------------------------------
barrier, split , split , split , split , split , split , split 丨 ---
split ---> reader : processing split --> reader File
--------------------------------------------------
.. processing ..
--------------------------------------------------
split , barrier, split , split , split , barrier , split , split 丨 ---
split ---> reader : processing barrier --> trigger checkpoint
--------------------------------------------------
The barrier is processed only after all the splits before it have been
processed, triggering a checkpoint.
According to the actual situation of my test, because upsert is enabled, a
lot of delete data will be generated, which will lead to a long time to process
split. In the case of reader operator, the processing speed is slow, resulting
in backpressure
The split read is performed by the flink task thread, and since the same
thread is used, a blocking problem occurs
Here is my sql
```sql
CREATE TABLE dg (
id INT,c1 VARCHAR,c2 VARCHAR,c3 VARCHAR,c4 VARCHAR,c5 VARCHAR,c6
VARCHAR,c7 VARCHAR,c8 VARCHAR,c9 VARCHAR,c10 VARCHAR,c11 VARCHAR,c12
VARCHAR,c13 VARCHAR,c14 VARCHAR,c15 VARCHAR,c16 VARCHAR,c17 VARCHAR,c18
VARCHAR,c19 VARCHAR,c20 VARCHAR,c21 VARCHAR,c22 VARCHAR,c23 VARCHAR,c24
VARCHAR,c25 VARCHAR,c26 VARCHAR,c27 VARCHAR,c28 VARCHAR,c29 VARCHAR,c30
VARCHAR,c31 VARCHAR,c32 VARCHAR
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '10000',
'number-of-rows' = '300000',
'fields.id.min'='0',
'fields.id.max'='50000001',
'fields.c1.length' = '20','fields.c2.length' = '20','fields.c3.length' =
'20','fields.c4.length' = '20','fields.c5.length' = '20','fields.c6.length' =
'20','fields.c7.length' = '20','fields.c8.length' = '20','fields.c9.length' =
'20','fields.c10.length' = '20','fields.c11.length' = '20','fields.c12.length'
= '20','fields.c13.length' = '20','fields.c14.length' =
'20','fields.c15.length' = '20','fields.c16.length' = '20','fields.c17.length'
= '20','fields.c18.length' = '20','fields.c19.length' =
'20','fields.c20.length' = '20','fields.c21.length' = '20','fields.c22.length'
= '20','fields.c23.length' = '20','fields.c24.length' =
'20','fields.c25.length' = '20','fields.c26.length' = '20','fields.c27.length'
= '20','fields.c28.length' = '20','fields.c29.length' =
'20','fields.c30.length' = '20','fields.c31.length' = '20','fields.c32.length'
= '20'
);
create table hc.db.test1(
id INT,c1 VARCHAR,c2 VARCHAR,c3 VARCHAR,c4 VARCHAR,c5 VARCHAR,c6 VARCHAR,c7
VARCHAR,c8 VARCHAR,c9 VARCHAR,c10 VARCHAR,c11 VARCHAR,c12 VARCHAR,c13
VARCHAR,c14 VARCHAR,c15 VARCHAR,c16 VARCHAR,c17 VARCHAR,c18 VARCHAR,c19
VARCHAR,c20 VARCHAR,c21 VARCHAR,c22 VARCHAR,c23 VARCHAR,c24 VARCHAR,c25
VARCHAR,c26 VARCHAR,c27 VARCHAR,c28 VARCHAR,c29 VARCHAR,c30 VARCHAR,c31
VARCHAR,c32 VARCHAR
, PRIMARY KEY (`id`) NOT ENFORCED
) with (
'format-version'='2',
'write.upsert.enabled' = 'true'
);
create table hc.db.test4(
id INT,c1 VARCHAR,c2 VARCHAR,c3 VARCHAR,c4 VARCHAR,c5 VARCHAR,c6 VARCHAR,c7
VARCHAR,c8 VARCHAR,c9 VARCHAR,c10 VARCHAR,c11 VARCHAR,c12 VARCHAR,c13
VARCHAR,c14 VARCHAR,c15 VARCHAR,c16 VARCHAR,c17 VARCHAR,c18 VARCHAR,c19
VARCHAR,c20 VARCHAR,c21 VARCHAR,c22 VARCHAR,c23 VARCHAR,c24 VARCHAR,c25
VARCHAR,c26 VARCHAR,c27 VARCHAR,c28 VARCHAR,c29 VARCHAR,c30 VARCHAR,c31
VARCHAR,c32 VARCHAR
, PRIMARY KEY (`id`) NOT ENFORCED
) with (
'format-version'='2',
'write.upsert.enabled' = 'true'
);
-- streaming read
insert into hc.db.test4 select * from hc.db.test1 /*+
OPTIONS('streaming'='true', 'monitor-interval'='10s')*/;
```


--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]