[
https://issues.apache.org/jira/browse/KAFKA-5055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15990086#comment-15990086
]
Davor Poldrugo commented on KAFKA-5055:
---------------------------------------
Hi guys!
I think the problem is because of a bug in the method
{{org.apache.kafka.streams.processor.internals.StreamTask#addRecords}}:
{code:java}
public int addRecords(TopicPartition partition,
Iterable<ConsumerRecord<byte[], byte[]>> records) {
final int oldQueueSize = partitionGroup.numBuffered();
final int newQueueSize = partitionGroup.addRawRecords(partition,
records);
log.trace("{} Added records into the buffered queue of partition {},
new queue size is {}", logPrefix, partition, newQueueSize);
// if after adding these records, its partition queue's buffered size
has been
// increased beyond the threshold, we can then pause the consumption
for this partition
if (newQueueSize > this.maxBufferedSize) {
consumer.pause(singleton(partition));
}
return newQueueSize - oldQueueSize;
}
{code}
This line is the problem:
{{final int oldQueueSize = partitionGroup.numBuffered();}}
Instead of getting {{oldQueueSize}} for current TopicPartition it gets queue
size for all the partitions, because {{partitionGroup.numBuffered()}} returns
the queue size across all partitions.
Then {{return newQueueSize - oldQueueSize}} returns a negative value. Because
it's more probable that the sum of all Queue sizes across partitions is bigger
then the Queue size of this partition.
This goes back to
{{org.apache.kafka.streams.processor.internals.StreamThread#runLoop}}:
{code:java}
for (TopicPartition partition : records.partitions()) {
StreamTask task = activeTasksByPartition.get(partition);
numAddedRecords += task.addRecords(partition, records.records(partition));
}
streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords,
timerStartedMs);
{code}
There it gets summed into {{numAddedRecords}} - by actually decreasing the
value (because it's negative), and this leads to wrong sensor value in
{{records.count() - numAddedRecords}}, because {{numAddedRecords}} is now
smaller because of a bug, not because of an invalid timestamp.
BugFix proposal:
In the method
{{org.apache.kafka.streams.processor.internals.StreamTask#addRecords}}
change:
{{final int oldQueueSize = partitionGroup.numBuffered();}}
to
{{final int oldQueueSize = partitionGroup.numBuffered(partition);}}
If this is the cause of the bug, can I do a pull request?
> Kafka Streams skipped-records-rate sensor producing nonzero values even when
> FailOnInvalidTimestamp is used as extractor
> ------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-5055
> URL: https://issues.apache.org/jira/browse/KAFKA-5055
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Nikki Thean
> Assignee: Guozhang Wang
>
> According to the code and the documentation for this metric, the only reason
> for a skipped record is an invalid timestamp, except that a) I am reading
> from a topic that is populated solely by Kafka Connect and b) I am using
> `FailOnInvalidTimestamp` as the timestamp extractor.
> Either I'm missing something in the documentation (i.e. another reason for
> skipped records) or there is a bug in the code that calculates this metric.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)