[ 
https://issues.apache.org/jira/browse/FLINK-33293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17776853#comment-17776853
 ] 

Mason Chen commented on FLINK-33293:
------------------------------------

Hi, this case should be handled by FLINK-31305 as [~tzulitai] mentioned. On 
delivery timeout, the flush should fail and restart from the previous 
checkpoint. I have confirmed that this is correct from your logs (KafkaSink 
threw exception, failed the checkpoint, and subsequently restarted the job). 

 

However, there is something suspicious with the restart, unrelated to the 
KafkaSink.

```

21:46:12,793 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] 
- Restoring job f543de153e861f822176132218942c8d from Checkpoint 5 @ 
1695671165746 for f543de153e861f822176132218942c8d located at 
<checkpoint-not-externally-addressable>. 21:46:12,817 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master state 
to restore

```

Is it possible that the job did not recover from checkpoint on the restart? 
That would violate at least once semantics.

> Data loss with Kafka Sink
> -------------------------
>
>                 Key: FLINK-33293
>                 URL: https://issues.apache.org/jira/browse/FLINK-33293
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.16.1, 1.16.2, 1.17.1
>            Reporter: Jasmin Redzepovic
>            Priority: Major
>         Attachments: job.log
>
>
> More info in Slack discussion: 
> [https://www.linen.dev/s/apache-flink/t/15851877/hi-all-it-s-me-again-based-on-https-apache-flink-slack-com-a#e102fa98-dcd7-40e8-a2c4-f7b4a83234e1]
>  
> *TLDR:*
> (in Flink version 1.15 I was unable to reproduce the issue, but in 1.16 and 
> 1.17 I can reproduce it)
> I have created a sink topic with 8 partitions, a replication factor of 3, and 
> a minimum in-sync replicas of 2. The consumer properties are set to their 
> default values.
> For the producer, I made changes to the delivery.timeout.ms and 
> request.timeout.ms properties, setting them to *5000ms* and *4000ms* 
> respectively. (acks are set to -1 by default, which is equals to _all_ I 
> guess)
> KafkaSink is configured with an AT_LEAST_ONCE delivery guarantee. The job 
> parallelism is set to 1 and the checkpointing interval is set to 2000ms. I 
> started a Flink Job and monitored its logs. Additionally, I was consuming the 
> __consumer_offsets topic in parallel to track when offsets are committed for 
> my consumer group.
> The problematic part occurs during checkpoint 5. Its duration was 5009ms, 
> which exceeds the delivery timeout for Kafka (5000ms). Although it was marked 
> as completed, I believe that the output buffer of KafkaSink was not fully 
> acknowledged by Kafka. As a result, Flink proceeded to trigger checkpoint 6 
> but immediately encountered a Kafka {_}TimeoutException: Expiring N 
> records{_}.
> I suspect that this exception originated from checkpoint 5 and that 
> checkpoint 5 should not have been considered successful. The job then failed 
> but recovered from checkpoint 5. Some time after checkpoint 7, consumer 
> offsets were committed to Kafka, and this process repeated once more at 
> checkpoint 9.
> Since the offsets of checkpoint 5 were committed to Kafka, but the output 
> buffer was only partially delivered, there has been data loss. I confirmed 
> this when sinking the topic to the database.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to