[ 
https://issues.apache.org/jira/browse/KAFKA-9312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17005419#comment-17005419
 ] 

ASF GitHub Bot commented on KAFKA-9312:
---------------------------------------

jonathansantilli commented on pull request #7877: KAFKA-9312: Wait for splitted 
batches to be processed after a KafkaProducer#flush()
URL: https://github.com/apache/kafka/pull/7877
 
 
   This commit adds the logic to wait for `splitted` batches to be processed 
after a Message Too Large Exception has been received.
   Also, add a new test class to cover `IncompleteBatches` class.
   
   This code adds a new constructor to the classes `ProducerBatch` and 
`RecordAccumulator` visible to the package, this decouples the dependency 
between `ProduceRequestResult` and `IncompleteBatches` respectively and allow 
to test the method `RecordAccumulator#awaitFlushCompletion()`
   
   The [Jira ticket](https://issues.apache.org/jira/browse/KAFKA-9312) provides 
a [test 
here](https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339)
 that prove the error without these changes.
   This PR does not include that specific test since it involves sleeping the 
thread and could lead to indeterminate behavior.
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KafkaProducer flush behavior does not guarantee completed sends under record 
> batch splitting
> --------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-9312
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9312
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 1.0.0, 1.1.0, 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0
>            Reporter: Lucas Bradstreet
>            Assignee: Jonathan Santilli
>            Priority: Major
>
> The KafkaProducer flush call guarantees that all records that have been sent 
> at time of the flush call will be either sent successfully or will result in 
> an error.
> The KafkaProducer will split record batches upon receiving a 
> MESSAGE_TOO_LARGE error from the broker. However the flush behavior relies on 
> the accumulator checking incomplete sends that exist at the time of the flush 
> call.
> {code:java}
> public void awaitFlushCompletion() throws InterruptedException {
>     try {
>         for (ProducerBatch batch : this.incomplete.copyAll())
>             batch.produceFuture.await();
>     } finally {
>         this.flushesInProgress.decrementAndGet();
>     }
> }{code}
> When large record batches are split, the batch producerFuture in question is 
> completed, and new batches added to the incomplete list of record batches. 
> This will break the flush guarantee as awaitFlushCompletion will finish 
> without awaiting the new split batches, and any pre-split batches being 
> awaited on above will have been completed.
> This is demonstrated in a test case that can be found at 
> [https://github.com/lbradstreet/kafka/commit/733a683273c31823df354d0a785cb2c24365735a#diff-0b8da0c7ceecaa1f00486dadb53208b1R2339]
> This problem is likely present since record batch splitting was added as of 
> KAFKA-3995; KIP-126; 0.11.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to