[ 
https://issues.apache.org/jira/browse/KAFKA-14659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hector Geraldino reassigned KAFKA-14659:
----------------------------------------

    Assignee: Hector Geraldino

> source-record-write-[rate|total] metrics include filtered records
> -----------------------------------------------------------------
>
>                 Key: KAFKA-14659
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14659
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: Chris Beard
>            Assignee: Hector Geraldino
>            Priority: Minor
>
> Source tasks in Kafka connect offer two sets of metrics (documented in 
> [ConnectMetricsRegistry.java|https://github.com/apache/kafka/blob/72cfc994f5675be349d4494ece3528efed290651/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java#L173-L191]):
> ||Metric||Description||
> |source-record-poll-rate|The average per-second number of records 
> produced/polled (before transformation) by this task belonging to the named 
> source connector in this worker.|
> |source-record-write-rate|The average per-second number of records output 
> from the transformations and written to Kafka for this task belonging to the 
> named source connector in this worker. This is after transformations are 
> applied and excludes any records filtered out by the transformations.|
> There are also corresponding "-total" metrics that capture the total number 
> of records polled and written for the metrics above, respectively.
> In short, the "poll" metrics capture the number of messages sourced 
> pre-transformation/filtering, and the "write" metrics should capture the 
> number of messages ultimately written to Kafka post-transformation/filtering. 
> However, the implementation of the {{source-record-write-*}}  metrics 
> _includes_ records filtered out by transformations (and also records that 
> result in produce failures with the config {{{}errors.tolerance=all{}}}).
> h3. Details
> In 
> [AbstractWorkerSourceTask.java|https://github.com/apache/kafka/blob/a382acd31d1b53cd8695ff9488977566083540b1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L389-L397],
>  each source record is passed through the transformation chain where it is 
> potentially filtered out, checked to see if it was in fact filtered out, and 
> if so it is accounted for in the internal metrics via 
> {{{}counter.skipRecord(){}}}.
> {code:java}
> for (final SourceRecord preTransformRecord : toSend) {         
>     retryWithToleranceOperator.sourceRecord(preTransformRecord);
>     final SourceRecord record = 
> transformationChain.apply(preTransformRecord);            
>     final ProducerRecord<byte[], byte[]> producerRecord = 
> convertTransformedRecord(record);
>     if (producerRecord == null || retryWithToleranceOperator.failed()) {      
>           
>         counter.skipRecord();
>         recordDropped(preTransformRecord);
>         continue;
>     }
>     ...
> {code}
> {{SourceRecordWriteCounter.skipRecord()}} is implemented as follows:
> {code:java}
>     ....
>     public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup 
> metricsGroup) {
>         assert batchSize > 0;
>         assert metricsGroup != null;
>         this.batchSize = batchSize;
>         counter = batchSize;
>         this.metricsGroup = metricsGroup;
>     }
>     public void skipRecord() {
>         if (counter > 0 && --counter == 0) {
>             finishedAllWrites();
>         }
>     }
>     ....
>     private void finishedAllWrites() {
>         if (!completed) {
>             metricsGroup.recordWrite(batchSize - counter);
>             completed = true;
>         }
>     }
> {code}
> For example: If a batch starts with 100 records, {{batchSize}} and 
> {{counter}} will both be initialized to 100. If all 100 records get filtered 
> out, {{counter}} will be decremented 100 times, and 
> {{{}finishedAllWrites(){}}}will record the value 100 to the underlying 
> {{source-record-write-*}}  metrics rather than 0, the correct value according 
> to the documentation for these metrics.
> h3. Solutions
> Assuming the documentation correctly captures the intent of the 
> {{source-record-write-*}}  metrics, it seems reasonable to fix these metrics 
> such that filtered records do not get counted.
> It may also be useful to add additional metrics to capture the rate and total 
> number of records filtered out by transformations, which would require a KIP.
> I'm not sure what the best way of accounting for produce failures in the case 
> of {{errors.tolerance=all}} is yet. Maybe these failures deserve their own 
> new metrics?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to