[ https://issues.apache.org/jira/browse/KAFKA-14659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chris Egerton updated KAFKA-14659: ---------------------------------- Fix Version/s: 3.3.3 > source-record-write-[rate|total] metrics include filtered records > ----------------------------------------------------------------- > > Key: KAFKA-14659 > URL: https://issues.apache.org/jira/browse/KAFKA-14659 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Reporter: Chris Beard > Assignee: Hector Geraldino > Priority: Minor > Fix For: 3.5.0, 3.4.1, 3.3.3 > > > Source tasks in Kafka connect offer two sets of metrics (documented in > [ConnectMetricsRegistry.java|https://github.com/apache/kafka/blob/72cfc994f5675be349d4494ece3528efed290651/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java#L173-L191]): > ||Metric||Description|| > |source-record-poll-rate|The average per-second number of records > produced/polled (before transformation) by this task belonging to the named > source connector in this worker.| > |source-record-write-rate|The average per-second number of records output > from the transformations and written to Kafka for this task belonging to the > named source connector in this worker. This is after transformations are > applied and excludes any records filtered out by the transformations.| > There are also corresponding "-total" metrics that capture the total number > of records polled and written for the metrics above, respectively. > In short, the "poll" metrics capture the number of messages sourced > pre-transformation/filtering, and the "write" metrics should capture the > number of messages ultimately written to Kafka post-transformation/filtering. > However, the implementation of the {{source-record-write-*}} metrics > _includes_ records filtered out by transformations (and also records that > result in produce failures with the config {{{}errors.tolerance=all{}}}). > h3. Details > In > [AbstractWorkerSourceTask.java|https://github.com/apache/kafka/blob/a382acd31d1b53cd8695ff9488977566083540b1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L389-L397], > each source record is passed through the transformation chain where it is > potentially filtered out, checked to see if it was in fact filtered out, and > if so it is accounted for in the internal metrics via > {{{}counter.skipRecord(){}}}. > {code:java} > for (final SourceRecord preTransformRecord : toSend) { > retryWithToleranceOperator.sourceRecord(preTransformRecord); > final SourceRecord record = > transformationChain.apply(preTransformRecord); > final ProducerRecord<byte[], byte[]> producerRecord = > convertTransformedRecord(record); > if (producerRecord == null || retryWithToleranceOperator.failed()) { > > counter.skipRecord(); > recordDropped(preTransformRecord); > continue; > } > ... > {code} > {{SourceRecordWriteCounter.skipRecord()}} is implemented as follows: > {code:java} > .... > public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup > metricsGroup) { > assert batchSize > 0; > assert metricsGroup != null; > this.batchSize = batchSize; > counter = batchSize; > this.metricsGroup = metricsGroup; > } > public void skipRecord() { > if (counter > 0 && --counter == 0) { > finishedAllWrites(); > } > } > .... > private void finishedAllWrites() { > if (!completed) { > metricsGroup.recordWrite(batchSize - counter); > completed = true; > } > } > {code} > For example: If a batch starts with 100 records, {{batchSize}} and > {{counter}} will both be initialized to 100. If all 100 records get filtered > out, {{counter}} will be decremented 100 times, and > {{{}finishedAllWrites(){}}}will record the value 100 to the underlying > {{source-record-write-*}} metrics rather than 0, the correct value according > to the documentation for these metrics. > h3. Solutions > Assuming the documentation correctly captures the intent of the > {{source-record-write-*}} metrics, it seems reasonable to fix these metrics > such that filtered records do not get counted. > It may also be useful to add additional metrics to capture the rate and total > number of records filtered out by transformations, which would require a KIP. > I'm not sure what the best way of accounting for produce failures in the case > of {{errors.tolerance=all}} is yet. Maybe these failures deserve their own > new metrics? -- This message was sent by Atlassian Jira (v8.20.10#820010)