[ https://issues.apache.org/jira/browse/KAFKA-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16418023#comment-16418023 ]
John Roesler edited comment on KAFKA-6376 at 3/28/18 7:55 PM: -------------------------------------------------------------- Just doing a little initial archaeology... Thread-level metric looks like it only counts this: org.apache.kafka.streams.processor.internals.StreamTask#addRecords: {quote}Adds records to queues. If a record has an invalid (i.e., negative) timestamp, the record is skipped and not added to the queue for processing {quote} Ah, but it drills down into org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords where we have {quote}final ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(processorContext, rawRecord); if (record == null) Unknown macro: \{ continue; }{quote} AND {quote}// drop message if TS is invalid, i.e., negative if (timestamp < 0) Unknown macro: \{ continue; }{quote} All other code paths in there either add the record or throw. The former of these cases is accounted in org.apache.kafka.streams.processor.internals.RecordDeserializer#deserialize: {quote}... catch (final Exception deserializationException) { ... else { sourceNode.nodeMetrics.sourceNodeSkippedDueToDeserializationError.record(); return null; }{quote} all other paths either return a record or throw was (Author: vvcephei): Thread-level metric looks like it only counts this: org.apache.kafka.streams.processor.internals.StreamTask#addRecords: {quote}Adds records to queues. If a record has an invalid (i.e., negative) timestamp, the record is skipped and not added to the queue for processing {quote} Ah, but it drills down into org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords where we have {quote}final ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(processorContext, rawRecord); if (record == null) { continue; }{quote} AND {quote}// drop message if TS is invalid, i.e., negative if (timestamp < 0) { continue; }{quote} All other code paths in there either add the record or throw. The former of these cases is accounted in org.apache.kafka.streams.processor.internals.RecordDeserializer#deserialize: {quote}... catch (final Exception deserializationException) {{quote} > Improve Streams metrics for skipped records > ------------------------------------------- > > Key: KAFKA-6376 > URL: https://issues.apache.org/jira/browse/KAFKA-6376 > Project: Kafka > Issue Type: Improvement > Components: metrics, streams > Affects Versions: 1.0.0 > Reporter: Matthias J. Sax > Assignee: John Roesler > Priority: Major > Labels: needs-kip > > Copy this from KIP-210 discussion thread: > {quote} > Note that currently we have two metrics for `skipped-records` on different > levels: > 1) on the highest level, the thread-level, we have a `skipped-records`, > that records all the skipped records due to deserialization errors. > 2) on the lower processor-node level, we have a > `skippedDueToDeserializationError`, that records the skipped records on > that specific source node due to deserialization errors. > So you can see that 1) does not cover any other scenarios and can just be > thought of as an aggregate of 2) across all the tasks' source nodes. > However, there are other places that can cause a record to be dropped, for > example: > 1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be > dropped due to window elapsed. > 2) KIP-210: records could be dropped on the producer side. > 3) records could be dropped during user-customized processing on errors. > {quote} > [~guozhang] Not sure what you mean by "3) records could be dropped during > user-customized processing on errors." > Btw: we also drop record with {{null}} key and/or value for certain DSL > operations. This should be included as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)