Jasmin Redzepovic created FLINK-33293:
-----------------------------------------
Summary: Data loss with Kafka Sink
Key: FLINK-33293
URL: https://issues.apache.org/jira/browse/FLINK-33293
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Affects Versions: 1.16.2
Reporter: Jasmin Redzepovic
More info in Slack discussion:
[https://www.linen.dev/s/apache-flink/t/15851877/hi-all-it-s-me-again-based-on-https-apache-flink-slack-com-a#e102fa98-dcd7-40e8-a2c4-f7b4a83234e1]
*TLDR:*
(in Flink version 1.15 I was unable to reproduce the issue, but in 1.16 and
1.17 I can reproduce it)
I have created a sink topic with 8 partitions, a replication factor of 3, and a
minimum in-sync replicas of 2. The consumer properties are set to their default
values.
For the producer, I made changes to the {{delivery.timeout.ms and
}}{{request.timeout.ms }}properties, setting them to *5000ms* and *4000ms*
respectively. ({{{}acks {}}}are set to -1 by default, which is equals to _all_
I guess)
KafkaSink is configured with an AT_LEAST_ONCE delivery guarantee. The job
parallelism is set to 1 and the checkpointing interval is set to 2000ms. I
started a Flink Job and monitored its logs. Additionally, I was consuming the
{{__consumer_offsets }}topic in parallel to track when offsets are committed
for my consumer group.
The problematic part occurs during checkpoint 5. Its duration was 5009ms, which
exceeds the delivery timeout for Kafka (5000ms). Although it was marked as
completed, I believe that the output buffer of KafkaSink was not fully
acknowledged by Kafka. As a result, Flink proceeded to trigger checkpoint 6 but
immediately encountered a Kafka {_}TimeoutException: Expiring N records{_}.
I suspect that this exception originated from checkpoint 5 and that checkpoint
5 should not have been considered successful. The job then failed but recovered
from checkpoint 5. Some time after checkpoint 7, consumer offsets were
committed to Kafka, and this process repeated once more at checkpoint 9.
Since the offsets of checkpoint 5 were committed to Kafka, but the output
buffer was only partially delivered, there has been data loss. I confirmed this
when sinking the topic to the database.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)