This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d36b24f45f7 KAFKA-17299: Fix Kafka Streams consumer hang issue (#17899)
d36b24f45f7 is described below
commit d36b24f45f700de318062eeeda90cfb657892151
Author: Laxman Ch <[email protected]>
AuthorDate: Sat Nov 23 08:02:00 2024 +0530
KAFKA-17299: Fix Kafka Streams consumer hang issue (#17899)
When Kafka Streams skips overs corrupted messages, it might not resume
previously paused partitions,
if more than one record is skipped at once, and if the buffer drop below
the max-buffer limit at the same time.
Reviewers: Matthias J. Sax <[email protected]>
---
.../java/org/apache/kafka/streams/processor/internals/StreamTask.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 1fdd298088b..3ea6a374e84 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -786,7 +786,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
// after processing this record, if its partition queue's buffered
size has been
// decreased to the threshold, we can then resume the consumption
on this partition
- if (recordInfo.queue().size() == maxBufferedSize) {
+ if (recordInfo.queue().size() <= maxBufferedSize) {
partitionsToResume.add(partition);
}