[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863951#comment-17863951
 ] 

Kevin Tseng commented on FLINK-33545:
-------------------------------------

Hi [~arvid] , [~masc] , [~thomasWeise] 

I agree that on the surface it appeared there's no way record can be missed. 
And this issue is not a frequent occurrence.

I was able to narrow down the requirement where this has a high chance to take 
place (not 100%)
 # Kafka cluster not fully stable either due to network issue / cluster upgrade
 # batching is enabled by setting "linger.ms" to anything other than 0   << if 
we disable batching, this issue goes away altogether >>
 # massive data load taking place at kafka (millions of record being processed 
with our testing)
 # ack is set to all

Due to the condition required for this to take place, I'm not able to carry out 
test at-will to generate needed behavior, we simply observed same problem 
happening multiple times throughout the years we left Flink running.

This was observed in Flink 1.16, 1.17 with Kafka Connector 3.0.

We have never observed this while using FlinkKafkaProducer in the past, and 
only started seeing this after switching to KafkaSink.

If we compared the implementation between FlinkKafkaProducer & KafkaSink, 
there's inherent assumption that all records are sent within KafkaSink, whereas 
FlinkKafkaProducer utilized callback with counter to ensure everything is done.

Therefore, I thought the best course of action without having a complete 
understanding why this happened would be the implementation that was also 
suggested by [~masc]: "The best thing we can do here is to fail and replay from 
last checkpoint, to maintain at least/exactly once semantics. This is also 
inline with how FlinkKafkaProducer has implemented this."

But this change would only make sense if there's any possibility that some 
record could be sent after flush has been triggered, or flush returned without 
fully committed all messages. I am inclined to believe all records have been 
produced correctly and blocking are done in the correct thread, but evidence so 
far said otherwise.

I have not conducted further test on this since last committed, and have only 
applied our own workaround & fix locally to our instances.

The issue: https://issues.apache.org/jira/browse/FLINK-35749 may also have been 
the culprit of this problem

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> -------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33545
>                 URL: https://issues.apache.org/jira/browse/FLINK-33545
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.18.0
>            Reporter: Kevin Tseng
>            Assignee: Kevin Tseng
>            Priority: Major
>              Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader<T>* class
>  # KafkaSink in *KafkaWriter<IN>* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer<String, String> producer = new KafkaProducer<>(props);
> try {
>     for (int i = 0; i < 10; i++) {
>         System.out.printf("sending record #%d\n", i);
>         String data = UUID.randomUUID().toString();
>         final ProducerRecord<String, String> record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
>         producer.send(record, new CB(Integer.toString(i), data));
>         Thread.sleep(10000); //sleep for 10 seconds
>     }
> } catch (Exception e) {
>     e.printStackTrace();
> } finally {
>     System.out.println("flushing");
>     producer.flush();
>     System.out.println("closing");
>     producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> prepareCommit will produce a list of KafkaCommittable that corresponds to 
> Transactional KafkaProducer to be committed. And a catch up flush will take 
> place during *commit* step. Whether this was intentional or not, due to the 
> fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the 
> end of EXACTLY_ONCE actually ensured everything fenced in the current 
> checkpoint will be sent to Kafka, or fail the checkpoint if not successful.
>  
> Due the above finding, I'm recommending one of the following fixes:
>  # need to perform second flush for AT_LEAST_ONCE
>  # or move flush to the end of the KafkaSink process.
> I'm leaning towards 2nd option as it does not make sense to flush then do 
> checkpoint, it should be right before checkpoint completes then we flush, 
> given that's what commit is meant to do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to