[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863737#comment-17863737
 ] 

Arvid Heise commented on FLINK-33545:
-------------------------------------

Sorry for being late to this party. But I wonder if this fix isn't to strict. 
When it fails, it requires a restart where we probably have an easier way to 
ensure correctness without failing.
I think we haven't fully understood the problem yet and implement some 
workaround. If we look at the `flush` JavaDoc of KafkaProducer, we get

{noformat}
Invoking this method makes all buffered records immediately available to send 
(even if linger. ms is greater than 0) and blocks on the completion of the 
requests associated with these records. The post-condition of flush() is that 
any previously sent record will have completed (e. g. Future. isDone() == 
true). A request is considered completed when it is successfully acknowledged 
according to the acks configuration you have specified or else it results in an 
error.
Other threads can continue sending records while one thread is blocked waiting 
for a flush call to complete, however no guarantee is made about the completion 
of records sent after the flush call begins.
This method can be useful when consuming from some input system and producing 
into Kafka. The flush() call gives a convenient way to ensure all previously 
sent messages have actually completed.
This example shows how to consume from one Kafka topic and produce to another 
Kafka topic:
  for(ConsumerRecord<String, String> record: consumer. poll(100))
     producer. send(new ProducerRecord("my-topic", record. key(), record. 
value());
 producer. flush();
 consumer. commitSync();
 
 
Note that the above example may drop records if the produce request fails. If 
we want to ensure that this does not occur we need to set 
retries=<large_number> in our config.
Applications don't need to call this method for transactional producers, since 
the commitTransaction() will flush all buffered records before performing the 
commit. This ensures that all the send(ProducerRecord) calls made since the 
previous beginTransaction() are completed before the commit
{noformat}

That means that `flush` is guaranteed to get an ack on all pending messages. So 
to lose data, we either call `flush` out of order or there is a bug in `flush`.

We also know that Flink sink sees the barrier only after processing all 100 
records (in the initial example). Both prepareSnapshotPreBarrier and the actual 
snapshot happen in the same root call. No mailbox letter can be interleaved at 
this point. So Flink should flush all 100 records if `flush` is not buggy.

One oddity about flush is that flush can be called concurrently. Look at how 
flush is implemented in KafkaProducer.
{code:java}
    public void flush() {
        log.trace("Flushing accumulated records in producer.");

        long start = time.nanoseconds();
        this.accumulator.beginFlush();
        this.sender.wakeup();
        try {
            this.accumulator.awaitFlushCompletion();
        } catch (InterruptedException e) {
            throw new InterruptException("Flush interrupted.", e);
        } finally {
            producerMetrics.recordFlush(time.nanoseconds() - start);
        }
    }
    public void awaitFlushCompletion() throws InterruptedException {
        try {
            // Obtain a copy of all of the incomplete ProduceRequestResult(s) 
at the time of the flush.
            // We must be careful not to hold a reference to the 
ProduceBatch(s) so that garbage
            // collection can occur on the contents.
            // The sender will remove ProducerBatch(s) from the original 
incomplete collection.
            for (ProduceRequestResult result : this.incomplete.requestResults())
                result.await();
        } finally {
            this.flushesInProgress.decrementAndGet();
        }
    }
{code}

So there is a chance that we have multiple flushesInProgress and our call to 
`flush` returns prematurely. However, I don't spot the issue immediately.

One thing that would certainly help, would be to get some debug/trace logs of 
the actual data loss.

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> -------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33545
>                 URL: https://issues.apache.org/jira/browse/FLINK-33545
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.18.0
>            Reporter: Kevin Tseng
>            Assignee: Kevin Tseng
>            Priority: Major
>              Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader<T>* class
>  # KafkaSink in *KafkaWriter<IN>* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer<String, String> producer = new KafkaProducer<>(props);
> try {
>     for (int i = 0; i < 10; i++) {
>         System.out.printf("sending record #%d\n", i);
>         String data = UUID.randomUUID().toString();
>         final ProducerRecord<String, String> record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
>         producer.send(record, new CB(Integer.toString(i), data));
>         Thread.sleep(10000); //sleep for 10 seconds
>     }
> } catch (Exception e) {
>     e.printStackTrace();
> } finally {
>     System.out.println("flushing");
>     producer.flush();
>     System.out.println("closing");
>     producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> prepareCommit will produce a list of KafkaCommittable that corresponds to 
> Transactional KafkaProducer to be committed. And a catch up flush will take 
> place during *commit* step. Whether this was intentional or not, due to the 
> fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the 
> end of EXACTLY_ONCE actually ensured everything fenced in the current 
> checkpoint will be sent to Kafka, or fail the checkpoint if not successful.
>  
> Due the above finding, I'm recommending one of the following fixes:
>  # need to perform second flush for AT_LEAST_ONCE
>  # or move flush to the end of the KafkaSink process.
> I'm leaning towards 2nd option as it does not make sense to flush then do 
> checkpoint, it should be right before checkpoint completes then we flush, 
> given that's what commit is meant to do.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to