Jasmin Redzepovic created FLINK-33293:
-----------------------------------------

             Summary: Data loss with Kafka Sink
                 Key: FLINK-33293
                 URL: https://issues.apache.org/jira/browse/FLINK-33293
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: 1.16.2
            Reporter: Jasmin Redzepovic


More info in Slack discussion: 
[https://www.linen.dev/s/apache-flink/t/15851877/hi-all-it-s-me-again-based-on-https-apache-flink-slack-com-a#e102fa98-dcd7-40e8-a2c4-f7b4a83234e1]

 

*TLDR:*

(in Flink version 1.15 I was unable to reproduce the issue, but in 1.16 and 
1.17 I can reproduce it)

I have created a sink topic with 8 partitions, a replication factor of 3, and a 
minimum in-sync replicas of 2. The consumer properties are set to their default 
values.

For the producer, I made changes to the {{delivery.timeout.ms and 
}}{{request.timeout.ms }}properties, setting them to *5000ms* and *4000ms* 
respectively. ({{{}acks {}}}are set to -1 by default, which is equals to _all_ 
I guess)

KafkaSink is configured with an AT_LEAST_ONCE delivery guarantee. The job 
parallelism is set to 1 and the checkpointing interval is set to 2000ms. I 
started a Flink Job and monitored its logs. Additionally, I was consuming the 
{{__consumer_offsets }}topic in parallel to track when offsets are committed 
for my consumer group.

The problematic part occurs during checkpoint 5. Its duration was 5009ms, which 
exceeds the delivery timeout for Kafka (5000ms). Although it was marked as 
completed, I believe that the output buffer of KafkaSink was not fully 
acknowledged by Kafka. As a result, Flink proceeded to trigger checkpoint 6 but 
immediately encountered a Kafka {_}TimeoutException: Expiring N records{_}.

I suspect that this exception originated from checkpoint 5 and that checkpoint 
5 should not have been considered successful. The job then failed but recovered 
from checkpoint 5. Some time after checkpoint 7, consumer offsets were 
committed to Kafka, and this process repeated once more at checkpoint 9.

Since the offsets of checkpoint 5 were committed to Kafka, but the output 
buffer was only partially delivered, there has been data loss. I confirmed this 
when sinking the topic to the database.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to