This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit a902a5abb3ec218b56316dc5f3a8e90d7ad84d2d Author: Laxman Ch <[email protected]> AuthorDate: Sat Nov 23 08:02:00 2024 +0530 KAFKA-17299: Fix Kafka Streams consumer hang issue (#17899) When Kafka Streams skips overs corrupted messages, it might not resume previously paused partitions, if more than one record is skipped at once, and if the buffer drop below the max-buffer limit at the same time. Reviewers: Matthias J. Sax <[email protected]> --- .../java/org/apache/kafka/streams/processor/internals/StreamTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 59ed6b92cd9..e26a7bbed3b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -792,7 +792,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, // after processing this record, if its partition queue's buffered size has been // decreased to the threshold, we can then resume the consumption on this partition - if (recordInfo.queue().size() == maxBufferedSize) { + if (recordInfo.queue().size() <= maxBufferedSize) { partitionsToResume.add(partition); }
