[
https://issues.apache.org/jira/browse/KAFKA-12870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jason Gustafson resolved KAFKA-12870.
-------------------------------------
Resolution: Fixed
> RecordAccumulator stuck in a flushing state
> -------------------------------------------
>
> Key: KAFKA-12870
> URL: https://issues.apache.org/jira/browse/KAFKA-12870
> Project: Kafka
> Issue Type: Bug
> Components: producer , streams
> Affects Versions: 2.5.1, 2.8.0, 2.7.1, 2.6.2
> Reporter: Niclas Lockner
> Assignee: Jason Gustafson
> Priority: Major
> Fix For: 3.0.0, 2.8.1
>
> Attachments: RecordAccumulator.log, full.log
>
>
> After a Kafka Stream with exactly once enabled has performed its first
> commit, the RecordAccumulator within the stream's internal producer gets
> stuck in a state where all subsequent ProducerBatches that get allocated are
> immediately flushed instead of being held in memory until they expire,
> regardless of the stream's linger or batch size config.
> This is reproduced in the example code found at
> [https://github.com/niclaslockner/kafka-12870] which can be run with
> ./gradlew run --args=<bootstrap servers>
> The example has a producer that sends 1 record/sec to one topic, and a Kafka
> stream with EOS enabled that forwards the records from that topic to another
> topic with the configuration linger = 5 sec, commit interval = 10 sec.
>
> The expected behavior when running the example is that the stream's
> ProducerBatches will expire (or get flushed because of the commit) every 5th
> second, and that the stream's producer will send a ProduceRequest every 5th
> second with an expired ProducerBatch that contains 5 records.
> The actual behavior is that the ProducerBatch is made immediately available
> for the Sender, and the Sender sends one ProduceRequest for each record.
>
> The example code contains a copy of the RecordAccumulator class (copied from
> kafka-clients 2.8.0) with some additional logging added to
> * RecordAccumulator#ready(Cluster, long)
> * RecordAccumulator#beginFlush()
> * RecordAccumulator#awaitFlushCompletion()
> These log entries show (see the attached RecordsAccumulator.log)
> * that the batches are considered sendable because a flush is in progress
> * that Sender.maybeSendAndPollTransactionalRequest() calls
> RecordAccumulator's beginFlush() without also calling awaitFlushCompletion(),
> and that this makes RecordAccumulator's flushesInProgress jump between 1-2
> instead of the expected 0-1.
>
> This issue is not reproducible in version 2.3.1 or 2.4.1.
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)