[ 
https://issues.apache.org/jira/browse/KAFKA-6376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16418023#comment-16418023
 ] 

John Roesler edited comment on KAFKA-6376 at 3/28/18 7:55 PM:
--------------------------------------------------------------

Just doing a little initial archaeology...

 

Thread-level metric looks like it only counts this:

org.apache.kafka.streams.processor.internals.StreamTask#addRecords:
{quote}Adds records to queues. If a record has an invalid (i.e., negative) 
timestamp, the record is skipped and not added to the queue for processing
{quote}
 

Ah, but it drills down into 
org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords where we 
have
{quote}final ConsumerRecord<Object, Object> record = 
recordDeserializer.deserialize(processorContext, rawRecord);
 if (record == null)
Unknown macro: \{ continue; }{quote}
AND
{quote}// drop message if TS is invalid, i.e., negative
 if (timestamp < 0)
Unknown macro: \{ continue; }{quote}
All other code paths in there either add the record or throw.

 

The former of these cases is accounted in 
org.apache.kafka.streams.processor.internals.RecordDeserializer#deserialize:
{quote}...
 catch (final Exception deserializationException) {

...
else {
 sourceNode.nodeMetrics.sourceNodeSkippedDueToDeserializationError.record();
 return null;
}{quote}
all other paths either return a record or throw

 

 

 


was (Author: vvcephei):
Thread-level metric looks like it only counts this:

org.apache.kafka.streams.processor.internals.StreamTask#addRecords:
{quote}Adds records to queues. If a record has an invalid (i.e., negative) 
timestamp, the record is skipped and not added to the queue for processing
{quote}
 

Ah, but it drills down into 
org.apache.kafka.streams.processor.internals.RecordQueue#addRawRecords where we 
have
{quote}final ConsumerRecord<Object, Object> record = 
recordDeserializer.deserialize(processorContext, rawRecord);
if (record == null) {
 continue;
}{quote}
AND
{quote}// drop message if TS is invalid, i.e., negative
if (timestamp < 0) {
 continue;
}{quote}
All other code paths in there either add the record or throw.

 

The former of these cases is accounted in 
org.apache.kafka.streams.processor.internals.RecordDeserializer#deserialize:
{quote}...
catch (final Exception deserializationException) {{quote}
 

 

 

> Improve Streams metrics for skipped records
> -------------------------------------------
>
>                 Key: KAFKA-6376
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6376
>             Project: Kafka
>          Issue Type: Improvement
>          Components: metrics, streams
>    Affects Versions: 1.0.0
>            Reporter: Matthias J. Sax
>            Assignee: John Roesler
>            Priority: Major
>              Labels: needs-kip
>
> Copy this from KIP-210 discussion thread:
> {quote}
> Note that currently we have two metrics for `skipped-records` on different
> levels:
> 1) on the highest level, the thread-level, we have a `skipped-records`,
> that records all the skipped records due to deserialization errors.
> 2) on the lower processor-node level, we have a
> `skippedDueToDeserializationError`, that records the skipped records on
> that specific source node due to deserialization errors.
> So you can see that 1) does not cover any other scenarios and can just be
> thought of as an aggregate of 2) across all the tasks' source nodes.
> However, there are other places that can cause a record to be dropped, for
> example:
> 1) https://issues.apache.org/jira/browse/KAFKA-5784: records could be
> dropped due to window elapsed.
> 2) KIP-210: records could be dropped on the producer side.
> 3) records could be dropped during user-customized processing on errors.
> {quote}
> [~guozhang] Not sure what you mean by "3) records could be dropped during 
> user-customized processing on errors."
> Btw: we also drop record with {{null}} key and/or value for certain DSL 
> operations. This should be included as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to