[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2024-07-09 Thread Thomas Weise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17864421#comment-17864421
 ] 

Thomas Weise commented on FLINK-33545:
--

I agree that even if we fix the underlying bug in FLINK-35749 it would still be 
useful to have this guard to detect situations where otherwise loss of data 
would be hard to spot. It could also be optional behind a flag, if the overhead 
was significant or of concern. And [~arvid] glad to see you back active in the 
Flink community!

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> prepareCommit will produce a list of KafkaCommittable that corresponds to 
> Transactional KafkaProducer to be committed. And a catch up flush will take 
> place during *commit* step. Whether this was intentional or not, due to the 
> fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the 
> end of EXACTLY_ONCE actually ensured everything fenced in the 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2024-07-09 Thread Arvid Heise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17864053#comment-17864053
 ] 

Arvid Heise commented on FLINK-33545:
-

Okay thanks for being open. I also think that 
https://issues.apache.org/jira/browse/FLINK-35749 is the most likely culprit. 
It would entirely explain the issue.

If we still would like to add your safety net, I'm fine with it. It's not 
super-expensive to calculate (although it needs to use atomics) and may avoid 
data loss in the future also for new regressions. I just wanted to make sure 
that we have a good hypothesis on where the actual issue resides and I'm happy 
with the current explanation.

I'm adding a suggestion to your PR on the error message.

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> prepareCommit will produce a list of KafkaCommittable that corresponds to 
> Transactional KafkaProducer to be committed. And a catch up flush will take 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2024-07-08 Thread Kevin Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863951#comment-17863951
 ] 

Kevin Tseng commented on FLINK-33545:
-

Hi [~arvid] , [~masc] , [~thomasWeise] 

I agree that on the surface it appeared there's no way record can be missed. 
And this issue is not a frequent occurrence.

I was able to narrow down the requirement where this has a high chance to take 
place (not 100%)
 # Kafka cluster not fully stable either due to network issue / cluster upgrade
 # batching is enabled by setting "linger.ms" to anything other than 0   << if 
we disable batching, this issue goes away altogether >>
 # massive data load taking place at kafka (millions of record being processed 
with our testing)
 # ack is set to all

Due to the condition required for this to take place, I'm not able to carry out 
test at-will to generate needed behavior, we simply observed same problem 
happening multiple times throughout the years we left Flink running.

This was observed in Flink 1.16, 1.17 with Kafka Connector 3.0.

We have never observed this while using FlinkKafkaProducer in the past, and 
only started seeing this after switching to KafkaSink.

If we compared the implementation between FlinkKafkaProducer & KafkaSink, 
there's inherent assumption that all records are sent within KafkaSink, whereas 
FlinkKafkaProducer utilized callback with counter to ensure everything is done.

Therefore, I thought the best course of action without having a complete 
understanding why this happened would be the implementation that was also 
suggested by [~masc]: "The best thing we can do here is to fail and replay from 
last checkpoint, to maintain at least/exactly once semantics. This is also 
inline with how FlinkKafkaProducer has implemented this."

But this change would only make sense if there's any possibility that some 
record could be sent after flush has been triggered, or flush returned without 
fully committed all messages. I am inclined to believe all records have been 
produced correctly and blocking are done in the correct thread, but evidence so 
far said otherwise.

I have not conducted further test on this since last committed, and have only 
applied our own workaround & fix locally to our instances.

The issue: https://issues.apache.org/jira/browse/FLINK-35749 may also have been 
the culprit of this problem

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2024-07-08 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863924#comment-17863924
 ] 

Mason Chen commented on FLINK-33545:


[~arvid] [~thw] I believe this is solved by 
https://issues.apache.org/jira/browse/FLINK-35749!

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> prepareCommit will produce a list of KafkaCommittable that corresponds to 
> Transactional KafkaProducer to be committed. And a catch up flush will take 
> place during *commit* step. Whether this was intentional or not, due to the 
> fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the 
> end of EXACTLY_ONCE actually ensured everything fenced in the current 
> checkpoint will be sent to Kafka, or fail the checkpoint if not successful.
>  
> Due the above finding, I'm recommending one of the following fixes:
>  # need to perform second flush for AT_LEAST_ONCE
>  # or move flush to the end of 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2024-07-08 Thread Arvid Heise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17863737#comment-17863737
 ] 

Arvid Heise commented on FLINK-33545:
-

Sorry for being late to this party. But I wonder if this fix isn't to strict. 
When it fails, it requires a restart where we probably have an easier way to 
ensure correctness without failing.
I think we haven't fully understood the problem yet and implement some 
workaround. If we look at the `flush` JavaDoc of KafkaProducer, we get

{noformat}
Invoking this method makes all buffered records immediately available to send 
(even if linger. ms is greater than 0) and blocks on the completion of the 
requests associated with these records. The post-condition of flush() is that 
any previously sent record will have completed (e. g. Future. isDone() == 
true). A request is considered completed when it is successfully acknowledged 
according to the acks configuration you have specified or else it results in an 
error.
Other threads can continue sending records while one thread is blocked waiting 
for a flush call to complete, however no guarantee is made about the completion 
of records sent after the flush call begins.
This method can be useful when consuming from some input system and producing 
into Kafka. The flush() call gives a convenient way to ensure all previously 
sent messages have actually completed.
This example shows how to consume from one Kafka topic and produce to another 
Kafka topic:
  for(ConsumerRecord record: consumer. poll(100))
 producer. send(new ProducerRecord("my-topic", record. key(), record. 
value());
 producer. flush();
 consumer. commitSync();
 
 
Note that the above example may drop records if the produce request fails. If 
we want to ensure that this does not occur we need to set 
retries= in our config.
Applications don't need to call this method for transactional producers, since 
the commitTransaction() will flush all buffered records before performing the 
commit. This ensures that all the send(ProducerRecord) calls made since the 
previous beginTransaction() are completed before the commit
{noformat}

That means that `flush` is guaranteed to get an ack on all pending messages. So 
to lose data, we either call `flush` out of order or there is a bug in `flush`.

We also know that Flink sink sees the barrier only after processing all 100 
records (in the initial example). Both prepareSnapshotPreBarrier and the actual 
snapshot happen in the same root call. No mailbox letter can be interleaved at 
this point. So Flink should flush all 100 records if `flush` is not buggy.

One oddity about flush is that flush can be called concurrently. Look at how 
flush is implemented in KafkaProducer.
{code:java}
public void flush() {
log.trace("Flushing accumulated records in producer.");

long start = time.nanoseconds();
this.accumulator.beginFlush();
this.sender.wakeup();
try {
this.accumulator.awaitFlushCompletion();
} catch (InterruptedException e) {
throw new InterruptException("Flush interrupted.", e);
} finally {
producerMetrics.recordFlush(time.nanoseconds() - start);
}
}
public void awaitFlushCompletion() throws InterruptedException {
try {
// Obtain a copy of all of the incomplete ProduceRequestResult(s) 
at the time of the flush.
// We must be careful not to hold a reference to the 
ProduceBatch(s) so that garbage
// collection can occur on the contents.
// The sender will remove ProducerBatch(s) from the original 
incomplete collection.
for (ProduceRequestResult result : this.incomplete.requestResults())
result.await();
} finally {
this.flushesInProgress.decrementAndGet();
}
}
{code}

So there is a chance that we have multiple flushesInProgress and our call to 
`flush` returns prematurely. However, I don't spot the issue immediately.

One thing that would certainly help, would be to get some debug/trace logs of 
the actual data loss.

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2024-02-20 Thread Kevin Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17819079#comment-17819079
 ] 

Kevin Tseng commented on FLINK-33545:
-

Hi [~mason6345] 

I have replicated the behavior you referred to in
{code:java}
https://github.com/apache/flink-connector-kafka/blob/15f2662eccf461d9d539ed87a78c9851cd17fa43/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1107
{code}
based on the logic displayed, it seemed *FlinkKafkaProducer* (the now 
deprecated producer) also considered the possibility of race condition,

as *pendingRecords* is only increased in *FlinkKafkaProducer::invoke* akin to 
*KafkaWriter::write* / *FlinkKafkaInternalProducer::send*

and decreased only in *FlinkKafkaProducer::acknowledgeMessage* that is only 
invoked within *Callback::onCompletion* akin to 
*KafkaWriter$WriterCallback::onCompletion*

the updated implementation are:
 # add pendingRecords typed AtomicLong (same as FlinkKafkaProducer)
 # increase the variable in FlinkKafkaInternalProducer::send
 # create an intermediate callback class TrackingCallback that decorates 
callback parameter of FlinkKafkaInternalProducer::send
 # decrease pendingRecords in the TrackingCallback decorated class
 # check the pendingRecords variable after flush, within 
FlinkKafkaInternalProducer::flush to ensure nothing else was sent while 
flushing has taken place, and throw IllegalStateException (same exception as 
FlinkKafkaProducer)

 

 

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2024-02-15 Thread Kevin Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817758#comment-17817758
 ] 

Kevin Tseng commented on FLINK-33545:
-

Hi [~mason6345] , [~yang] 

I have simplified the approach to implement the suggestion.

Now the change is:
 # changed hasRecordsInBuffer flag to flushGated
 # only set this to true at
 ## the beginning of flush, and unset at the end of flush within 
FlinkKafkaInternalProducer
 ## the beginning of commitTransaction, and unset at the end of 
commitTransaction within FlinkKafkaInternalProducer
 # added check for this flag in FlinkKafkaInternalProducer::send and throw 
KafkaException when detected to be true (flush gating is taking place)
 # added FlinkKafkaInternalProducer::setFlushGate(boolean closed) as public for 
unit testing purpose

this limited the modification to the code at a minimal level to only 
FlinkKafkaInternalProducer and KafkaWriterITCase classes, and should not have 
any impact on any other scenario given that commitTransaction & flush should be 
an atomic operation in KafkaProducer

Are you guys ok with the above approach?

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2024-02-13 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817136#comment-17817136
 ] 

Mason Chen commented on FLINK-33545:


Hi [~aeolus811tw], have you had time to review the followup feedback? As 
mentioned before there's no guarantee that a second commit would succeed and it 
would possibly need multiple. The best way to do this is by throwing an 
exception and allowing Flink to restart and try again 

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> prepareCommit will produce a list of KafkaCommittable that corresponds to 
> Transactional KafkaProducer to be committed. And a catch up flush will take 
> place during *commit* step. Whether this was intentional or not, due to the 
> fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the 
> end of EXACTLY_ONCE actually ensured everything fenced in the current 
> checkpoint will be sent to Kafka, or fail the 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2024-01-29 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812113#comment-17812113
 ] 

Mason Chen commented on FLINK-33545:


[~yang] yup that's what I'm proposing. We could track an AtomicInteger and 
decrement it in the writer callback. Then, we need to assert that the counter 
is 0 (this is what we assume to be the contract of the KafkaProducer#flush 
API). 

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> prepareCommit will produce a list of KafkaCommittable that corresponds to 
> Transactional KafkaProducer to be committed. And a catch up flush will take 
> place during *commit* step. Whether this was intentional or not, due to the 
> fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the 
> end of EXACTLY_ONCE actually ensured everything fenced in the current 
> checkpoint will be sent to Kafka, or fail the checkpoint if not successful.
>  
> Due the 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2024-01-29 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811763#comment-17811763
 ] 

Martijn Visser commented on FLINK-33545:


Since the reported exception is related to the Kafka Client, the first thing I 
actually want to see is that this gets reproduced with the latest version of 
the Flink Kafka connector. The currently latest available Flink Kafka connector 
(v3.0.2) uses Kafka Client 3.2.3, while the newer Flink Kafka Connector v3.1 
that's currently undergoing a vote uses Kafka Client v3.4.0. It can very well 
be that we encountered a bug in the used Kafka Client, that has been fixed 
since. 

Even better, we actually should upgrade the used Kafka Client to the latest 
version (v3.6.0). Potentially we should even wait until the 2PC changes are 
coming in. 

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> prepareCommit will produce a list of KafkaCommittable that 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2024-01-28 Thread Yang LI (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811646#comment-17811646
 ] 

Yang LI commented on FLINK-33545:
-

Hello,

If I understand correctly, [~mason6345] 's proposal doesn't necessarily imply 
that we need to track each exact pending record. Instead, we can continue 
utilizing [~aeolus811tw] 's variable to monitor pending records. If this 
variable remains true after the first flush, rather than attempting a second 
flush, we should consider failing the checkpoint. This could compel Flink to 
replay from the last checkpoint.

Thanks

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> prepareCommit will produce a list of KafkaCommittable that corresponds to 
> Transactional KafkaProducer to be committed. And a catch up flush will take 
> place during *commit* step. Whether this was intentional or not, due to the 
> fact that flush is a blocking call, the second flush for 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2024-01-28 Thread Kevin Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811642#comment-17811642
 ] 

Kevin Tseng commented on FLINK-33545:
-

Hi [~mason6345] 

I'm not sure that's possible without going deeper into KafkaProducer code.

KafkaProducer is only a facade to the actual buffer that holds and sends record 
via RecordAccumulator class, it then performs a blocking wait on the iterative 
request result of which doesn't really give us any insight into how many 
records have been synced successfully. To achieve what is described on tracking 
exact pending record will require us to go deeper into the Kafka internal 
classes, which might make this more prone to issues

as per my PR that i have done the following:
 # use variable to track whether there's pending record
 # if there is and Guarantee is AT_LEAST_ONCE, we perform second commit / 
follow EXACTLY_ONCE path to trigger the commit on producer
 # if there's any failure, it should throw error immediately as per 
EXACTLY_ONCE route

and since step 3 is the final step before flink checkpoint is committed, if 
this happens the retry logic should take place as you described

Do let me know if this implementation satisfies the proposal: [~yang] 
[~tzulitai] [~martijnvisser] 

Thanks

 

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2024-01-26 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811156#comment-17811156
 ] 

Mason Chen commented on FLINK-33545:


I do think this issue here is not with the Flink runtime but it could be 
possible that the Kafka client flush doesn't flush all records (I haven't 
personally encountered this yet). As explained, if even there was a bug with 
the Flink runtime, the proper fix would need to be in Flink, rather than this 
connector.

Your proposal is to pretty much explicitly retry the flush, but there's no 
telling if that 2nd flush won't exhibit the same behavior as the first. As 
such, the best thing that the connector can do here is to 
 # maintain a counter of records to be flushed.
 # on completion of flush during checkpoint phase, verify that there are no 
exceptions AND no pending records.
 # if there was lingering records, fail immediately–causing job restart and 
subsequently checkpoint to fail.

The best thing we can do here is to fail and replay from last checkpoint, to 
maintain at least/exactly once semantics. This is also inline with how 
FlinkKafkaProducer has implemented this. My best guess is there is some race 
condition during broker network issues. I think we can implement this first and 
make carefully logging when this condition occurs.

WDYT of my proposal? [~aeolus811tw] [~yang] [~tzulitai] [~martijnvisser] 

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2024-01-25 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811037#comment-17811037
 ] 

Martijn Visser commented on FLINK-33545:


[~mason6345] WDYT about [~aeolus811tw] his last comments? 

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> prepareCommit will produce a list of KafkaCommittable that corresponds to 
> Transactional KafkaProducer to be committed. And a catch up flush will take 
> place during *commit* step. Whether this was intentional or not, due to the 
> fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the 
> end of EXACTLY_ONCE actually ensured everything fenced in the current 
> checkpoint will be sent to Kafka, or fail the checkpoint if not successful.
>  
> Due the above finding, I'm recommending one of the following fixes:
>  # need to perform second flush for AT_LEAST_ONCE
>  # or move flush to the end of the KafkaSink process.
> I'm 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2024-01-16 Thread Yang LI (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17807401#comment-17807401
 ] 

Yang LI commented on FLINK-33545:
-

Hello [~aeolus811tw] [~mason6345] , Just find this ticket, do you think if 
there will be a fix about this?
I'll need to activate kafkaProducer property "batch.size" , "linger.ms" and 
"compression.type" for batching and compression and we use at_least_once 
semantic and kafka ack "1".  I imagine I am in the scenario described by 
[~aeolus811tw] 

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> prepareCommit will produce a list of KafkaCommittable that corresponds to 
> Transactional KafkaProducer to be committed. And a catch up flush will take 
> place during *commit* step. Whether this was intentional or not, due to the 
> fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the 
> end of EXACTLY_ONCE actually ensured everything fenced in the 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2023-12-06 Thread Kevin Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793866#comment-17793866
 ] 

Kevin Tseng commented on FLINK-33545:
-

Hi Mason,

Kafka Producer only throw exception in following situation:
 # broker connection failure (either by broker or intermediary means such as 
firewall timeout)
 # timeout took place (delivery or request timeout)
 # record issue that violates producer setting (request size limit etc)

anything other than these producer will keep retrying (as default of retry is 
Integer.MAX_VALUE) and will not throw error

however a flush should block until any of the above 3 condition took place.

this can be verified using the snippet i included above in the description of 
the issue.

that being said, due to the nature of KafkaProducer and stability of it, I had 
to assume it worked as intended.

if a task is truly single thread, including committing / barrier handling, 
there shouldn't be any data loss. But that's not what was observed.

the records that were lost generally aren't large in volume, only resembles one 
or two producer thread failure. And in the case of AT_LEAST_ONCE this shouldn't 
happen.

Given that, I think it might be safer to put commit / flush to the end of the 
commit cycle, instead of before actual checkpoint is taking place.

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2023-12-05 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17793457#comment-17793457
 ] 

Mason Chen commented on FLINK-33545:


The reason why you don't find any synchronization is because the task is single 
threaded. Even if the issue you describe exists, it would need to be fixed in 
the Flink runtime and not the Kafka connector, as such a bug would affect all 
sink connectors.

Assuming the Flink runtime is correct, the only way for data loss to occur if 
there is a case when the Kafka Producer API doesn't throw an exception when the 
broker has not ack'ed the record.

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> prepareCommit will produce a list of KafkaCommittable that corresponds to 
> Transactional KafkaProducer to be committed. And a catch up flush will take 
> place during *commit* step. Whether this was intentional or not, due to the 
> fact that flush is a blocking 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2023-11-30 Thread Kevin Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791903#comment-17791903
 ] 

Kevin Tseng commented on FLINK-33545:
-

Hi Mason,

thank you for the comment.

In the issue reported i made couple assumptions:
 # Flink Checkpoint does complete without any hindrance, causing future job 
failure not to recover from proper offset, leading to data loss
 # Broker has successfully acked record during first flush (with ack=-1 or 
ack=all) as by design, making it impossible for data loss to be originated from 
broker issue

 

 
{code:java}
Are you saying that an exception is not thrown by the KafkaWriter#flush method 
due to the Kafka producer timeout configuration?{code}
observation showed that first flush does complete successfully, which allowed 
checkpoint to proceed, job failure caused by Broker outage is observed and 
Flink will attempt to restore from the created checkpoint (the one that was 
successfully created)
{code:java}
The checkpoint barrier would have been between record 96 and 97. So the 
checkpoint afterwards would covers the at-least-once guarantee of 97-100. If 
you are saying that the checkpoint barrier is not respected, then that would be 
a bug in Flink.{code}
 

my understanding is based on the documentation

[https://nightlies.apache.org/flink/flink-docs-master/docs/learn-flink/fault_tolerance/]

[https://flink.apache.org/2018/02/28/an-overview-of-end-to-end-exactly-once-processing-in-apache-flink-with-apache-kafka-too/]

checkpoint barrier produced by the initial operator of a task manager 
(KafkaSource in this case) and flow along the graph within the same task 
manager, not every operator will generate a barrier.

In this scenario KafkaSource will generate a barrier at record 100 instead of 
between 96 and 97, which it will successfully complete as there's no issue 
committing read offset back to Broker at this point.

this is based on the precondition of everything before the barrier generated 
will be processed or tracked in the state, if anything breaks this 
precondition, there will be data loss.

Due to this, i believe the issue lies within the implementation of KafkaSink 
that did not fully honor the contract of checkpoint.

 

 
{code:java}
By any chance, does your KafkaSink serialization schema produce multiple 
records from one input record? {code}
data loss has been observed in 1-to-1 record streaming as well as 1-to-many 
scenarios.

 

 

 
{code:java}
The original description describes a scenario when the flush succeeds after a 
broker becomes healthy again. If the flush request is within checkpoint 
timeout, then this is fine right? If not, I would expect Flink to fail the 
checkpoint as it as exceeded the checkpoint timeout.{code}
 

As per observation, if broker became healthy within the producer timeout / 
checkpoint timeout period, there's no data loss observed.

If broker incurred network / unexpected failure for a prolong period of time, 
data loss will be observed from time to time.

the requirement for data loss to appear to be Flink failure induced restart.

 

My hypothesis after process of elimination, albeit with some uncertainty is 
that:

Based on the information illustrated in the implementation of prebarrier: 
https://issues.apache.org/jira/browse/FLINK-9428

Pre-Barrier will be invoked before barrier is emitted, but there's no 
mentioning of Pre-Barrier blocking other records from being processed 
simultaneously and I was not able to find any synchronize / blocking.

If the above is true and Flink failed right after checkpoint succeed, there 
will be data loss as KafkaSink does not do anything after Pre-Barrier phase 
except when being set as EXACTLY_ONCE

However, given that i'm not able to confirm some of the suspicion i have, I 
figured the safest way to perform flush is within the commit phase of Two Phase 
Commit, instead of pre-commit / prepareCommit phase, hence the adding flush to 
KafkaCommitter for AT_LEAST_ONCE.

 

Let me know if I have any misunderstand of how checkpointing, barrier handling, 
and 2PC works, or if there's any mistake in my assumption.

Thanks

 

 

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2023-11-30 Thread Mason Chen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17791866#comment-17791866
 ] 

Mason Chen commented on FLINK-33545:


Hi all, I just saw this ticket. I've read through the thread here and I'm still 
confused how this can happen.
{quote}The problem with the first flush is that it happened before actual 
snapshot is tracked, inside `SinkWriteOperator.prepareSnapshotPreBarrier`

If broker is alive and well, the first flush will have no issue, which is what 
we are seeing.

AT_LEAST_ONCE and NONE are currently behaving the same in this regards, since 
they don't generate commitable after this.
{quote}
Are you saying that an exception is not thrown by the KafkaWriter#flush method 
due to the Kafka producer timeout configuration?
{quote}from this point til the actual snapshot being triggered in 
`KafkaSource.snapshotState` (record #1 ~ #100) there seem to be a race 
condition where some record may have been ingested, but wasn't part of the 
first flush (record #1 ~ #96) and have now made it to the producer (#97 ~ #100).
{quote}
The checkpoint barrier would have been between record 96 and 97. So the 
checkpoint afterwards would covers the at-least-once guarantee of 97-100.

The original description describes a scenario when the flush succeeds after a 
broker becomes healthy again. If the flush request is within checkpoint 
timeout, then this is fine right? If not, I would expect Flink to fail the 
checkpoint as it as exceeded the checkpoint timeout.

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>  Labels: pull-request-available
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2023-11-16 Thread Kevin Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786933#comment-17786933
 ] 

Kevin Tseng commented on FLINK-33545:
-

I'm also aware of the issue FLINK-31305

i don't believe that change really make any difference here.

it changed from throwing FlinkRuntimeException within WriterCallback to storing 
it in asyncProducerException

I would say original way to handle the exception was fine, as flush is a 
blocking call that will always trigger WriterCallback, storing then check v.s 
throwing right away doesn't really make any difference

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> prepareCommit will produce a list of KafkaCommittable that corresponds to 
> Transactional KafkaProducer to be committed. And a catch up flush will take 
> place during *commit* step. Whether this was intentional or not, due to the 
> fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the 
> end of EXACTLY_ONCE 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2023-11-16 Thread Kevin Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786907#comment-17786907
 ] 

Kevin Tseng commented on FLINK-33545:
-

Hi Tzu-Li,

Thanks for the response.

Below is my analysis of the issue, and please correct me if i am wrong on any 
part

 
{code:java}
this is where I'm a bit lost. A flush() call on a producer is blocking until 
all records are acked + after the blocking flush returns, we check if any of 
the records that were flushed resulted in an error in which case we fail the 
job and not complete the checkpoint.{code}
 

The problem with the first flush is that it happened before actual snapshot is 
tracked, inside `SinkWriteOperator.prepareSnapshotPreBarrier`
{code:java}
If broker is alive and well, the first flush will have no issue, which is what 
we are seeing.{code}
AT_LEAST_ONCE and NONE are currently behaving the same in this regards, since 
they don't generate commitable after this.

from this point til the actual snapshot being triggered in 
`KafkaSource.snapshotState` (record #1 ~ #100) there seem to be a race 
condition where some record may have been ingested, but wasn't part of the 
first flush (record #1 ~ #96) and have now made it to the producer (#97 ~ #100).

since the whole flink process is concurrent it only allows the internal 
KafkaProducer to actually commit its current buffer, not necessary all records 
that are still flowing through. And the way flink is utilizing KafkaProducer 
asynchronously, it can't catch error until KafkaProducer actually attempted to 
commit (flush).

There is only a short window of time between the first flush and the 
snapshotState of the KafkaSource (approx 15ms at most for situations that i 
have tested)

 
{code:java}
If I am understanding you correctly, as it sounds like in this ticket, that 
second flush actually is not a no-op?{code}
 

You are correct that the 2nd flush is a no-op as commitTransaction will flush 
before committing; making it actually doing 3 flushes in total when 
EXACTLY_ONCE is set. But there's still a flush that ensure all records have 
been committed before completing the checkpoint.

the problem is there's no 2nd flush for AT_LEAST_ONCE when checkpoint is 
finalizing to ensure there is no data left in the buffer / still being sent by 
the internal KafkaProducer.

This resulted in the current triggered checkpoint to be successful, as it only 
required first flush to be successful.

I supposed the original design decided that any read status of the non-flushed 
records can go into next checkpoint, but if broker is having issue at this 
point then the next checkpoint will not be successful, causing Flink restart 
from previous successful checkpoint handling to kick in.

 
{code:java}
So, how could the KafkaWriter only flush records #1 to #96 and already proceed 
to process the checkpoint barrier? This can only ever happen with unaligned 
checkpointing, and even in that case, records #97 to #100 will be part of the 
channel state that will be replayed upon recovery.{code}
I'm assuming this is referring to channel state of KafkaSink / functional 
operator precede it?

doesn't this require KafkaSource to keep track of these uncommitted records? or 
operators before KafkaSink and after KafkaSource to keep these records in 
states? because once it reaches KafkaSink, it doesn't keep track of anything 
beyond first flush.

 

I made following modification to test this scenario:
 # added flag to track pending records in FlinkKafkaInternalProducer between 
send & flush methods
 # changed prepareCommit to also check for this flag, and only return empty 
commitable by default if DeliveryGuarantee.NONE is set
 # in KafkaCommitter i use the flag producer.isInTransaction to determine 
whether it should be a commitTransaction call / flush call (as with the change 
#1 & #2 AT_LEAST_ONCE will also reach this segment)

in my testing the
{code:java}
LOG.debug("Committing {} committables.", committables); {code}
debug line that's supposed to be triggered only if there's record to be 
flushed/committed do get triggered from time to time when there's broker 
stability issue, confirming my suspicion.

 

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2023-11-16 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786873#comment-17786873
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-33545:
-

https://issues.apache.org/jira/browse/FLINK-33293 related issue

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> prepareCommit will produce a list of KafkaCommittable that corresponds to 
> Transactional KafkaProducer to be committed. And a catch up flush will take 
> place during *commit* step. Whether this was intentional or not, due to the 
> fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the 
> end of EXACTLY_ONCE actually ensured everything fenced in the current 
> checkpoint will be sent to Kafka, or fail the checkpoint if not successful.
>  
> Due the above finding, I'm recommending one of the following fixes:
>  # need to perform second flush for AT_LEAST_ONCE
>  # or move flush to the end of the KafkaSink process.
> I'm leaning towards 2nd option as it 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2023-11-16 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786863#comment-17786863
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-33545:
-

{code}

In a scenario that:
 * KafkaSource ingested records #1 to #100
 * KafkaSink only had chance to send records #1 to #96
 * with a batching interval of 5ms

{code}

this is where I'm a bit lost. A flush() call on a producer is blocking until 
all records are acked + after the blocking flush returns, we check if any of 
the records that were flushed resulted in an error in which case we fail the 
job and not complete the checkpoint. See that code here:

[https://github.com/apache/flink-connector-kafka/blob/main/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L201-L208]

So, how could the KafkaWriter only flush records #1 to #96 and already proceed 
to process the checkpoint barrier?

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}.
> 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2023-11-16 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786860#comment-17786860
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-33545:
-

hey [~aeolus811tw], thanks for the detailed writeup.

I'm still trying to understand the need for the second flush, but wanted to 
give a quick comment on:

> prepareCommit will produce a list of KafkaCommittable that corresponds to 
>Transactional KafkaProducer to be committed. And a catch up flush will take 
>place during *commit* step. Whether this was intentional or not, due to the 
>fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the 
>end of EXACTLY_ONCE actually ensured everything fenced in the current 
>checkpoint will be sent to Kafka, or fail the checkpoint if not successful.

the {{producer.commitTransaction()}} call does a flush first before sending out 
the request to commit the Kafka transaction. The assumption in the 
{{KafkaWriter}} code has been that by the time {{commitTransaction()}} is 
called, all pending records are already flushed, so the internal flush done at 
commit time should have been a no-op.

If I am understanding you correctly, as it sounds like in this ticket, that 
second flush actually is not a no-op?

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Assignee: Kevin Tseng
>Priority: Major
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be triggered until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> Properties props = new Properties(); 
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
> props.put(ProducerConfig.CLIENT_ID_CONFIG, "example-producer");
> props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName()); 
> props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); 
> props.put(ProducerConfig.ACKS_CONFIG, "all"); 
> final KafkaProducer producer = new KafkaProducer<>(props);
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 10 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first 

[jira] [Commented] (FLINK-33545) KafkaSink implementation can cause dataloss during broker issue when not using EXACTLY_ONCE if there's any batching

2023-11-14 Thread Kevin Tseng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786084#comment-17786084
 ] 

Kevin Tseng commented on FLINK-33545:
-

we have a fix to address this issue, can anyone able assign this bug to me, 
thanks!

> KafkaSink implementation can cause dataloss during broker issue when not 
> using EXACTLY_ONCE if there's any batching
> ---
>
> Key: FLINK-33545
> URL: https://issues.apache.org/jira/browse/FLINK-33545
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.18.0
>Reporter: Kevin Tseng
>Priority: Major
>
> In the current implementation of KafkaSource and KafkaSink there are some 
> assumption that were made:
>  # KafkaSource completely relies on Checkpoint to manage and track its offset 
> in *KafkaSourceReader* class
>  # KafkaSink in *KafkaWriter* class only performs catch-flush when 
> *DeliveryGuarantee.EXACTLY_ONCE* is specified.
> KafkaSource is assuming that checkpoint should be properly fenced and 
> everything it had read up-til checkpoint being initiated will be processed or 
> recorded by operators downstream, including the TwoPhaseCommiter such as 
> *KafkaSink*
> *KafkaSink* goes by the model of:
>  
> {code:java}
> flush -> prepareCommit -> commit{code}
>  
> In a scenario that:
>  * KafkaSource ingested records #1 to #100
>  * KafkaSink only had chance to send records #1 to #96
>  * with a batching interval of 5ms
> when checkpoint has been initiated, flush will only confirm the sending of 
> record #1 to #96.
> This allows checkpoint to proceed as there's no error, and record #97 to 100 
> will be batched after first flush.
> Now, if broker goes down / has issue that caused the internal KafkaProducer 
> to not be able to send out the record after a batch, and is on a constant 
> retry-cycle (default value of KafkaProducer retries is Integer.MAX_VALUE), 
> *WriterCallback* error handling will never be trigger until the next 
> checkpoint flush.
> This can be tested by creating a faulty Kafka cluster and run the following 
> code:
> {code:java}
> try {
> for (int i = 0; i < 10; i++) {
> System.out.printf("sending record #%d\n", i);
> String data = UUID.randomUUID().toString();
> final ProducerRecord record = new 
> ProducerRecord<>(TOPIC, Integer.toString(i), data);
> producer.send(record, new CB(Integer.toString(i), data));
> Thread.sleep(1); //sleep for 5 seconds
> }
> } catch (Exception e) {
> e.printStackTrace();
> } finally {
> System.out.println("flushing");
> producer.flush();
> System.out.println("closing");
> producer.close();
> }{code}
> Once callback returns due to network timeout, it will cause Flink to restart 
> from previously saved checkpoint (which recorded reading up to record #100), 
> but KafkaWriter never sent record #97 to #100.
> This will result in dataloss of record #97 to #100
> Because KafkaWriter only catches error *after* callback, if callback is never 
> invoked (due to broker issue) right after the first flush has taken place, 
> those records are effectively gone unless someone decided to go back and look 
> for it.
> This behavior should be ok if user has set {*}DeliveryGuarantee.NONE{*}, but 
> is not expected for {*}DeliveryGuarantee.AT_LEAST_ONCE{*}.
> There is a divergence of the process in the event of {*}EXACTLY_ONCE{*}. ** 
> prepareCommit will produce a list of KafkaCommittable that corresponds to 
> Transactional KafkaProducer to be committed. And a catch up flush will take 
> place during *commit* step. Whether this was intentional or not, due to the 
> fact that flush is a blocking call, the second flush for EXACTLY_ONCE at the 
> end of EXACTLY_ONCE actually ensured everything fenced in the current 
> checkpoint will be sent to Kafka, or fail the checkpoint if not successful.
>  
> Due the above finding, I'm recommending one of the following fixes:
>  # need to perform second flush for AT_LEAST_ONCE
>  # or move flush to the end of the KafkaSink process.
> I'm leaning towards 2nd option as it does not make sense to flush then do 
> checkpoint, it should be right before checkpoint completes then we flush, 
> given that's what commit is meant to do.
>  
> This issue: https://issues.apache.org/jira/browse/FLINK-31305 was supposed to 
> fix this but it never really did.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)