[
https://issues.apache.org/jira/browse/KAFKA-14659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chris Egerton resolved KAFKA-14659.
-----------------------------------
Resolution: Fixed
> source-record-write-[rate|total] metrics include filtered records
> -----------------------------------------------------------------
>
> Key: KAFKA-14659
> URL: https://issues.apache.org/jira/browse/KAFKA-14659
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Reporter: Chris Beard
> Assignee: Hector Geraldino
> Priority: Minor
>
> Source tasks in Kafka connect offer two sets of metrics (documented in
> [ConnectMetricsRegistry.java|https://github.com/apache/kafka/blob/72cfc994f5675be349d4494ece3528efed290651/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java#L173-L191]):
> ||Metric||Description||
> |source-record-poll-rate|The average per-second number of records
> produced/polled (before transformation) by this task belonging to the named
> source connector in this worker.|
> |source-record-write-rate|The average per-second number of records output
> from the transformations and written to Kafka for this task belonging to the
> named source connector in this worker. This is after transformations are
> applied and excludes any records filtered out by the transformations.|
> There are also corresponding "-total" metrics that capture the total number
> of records polled and written for the metrics above, respectively.
> In short, the "poll" metrics capture the number of messages sourced
> pre-transformation/filtering, and the "write" metrics should capture the
> number of messages ultimately written to Kafka post-transformation/filtering.
> However, the implementation of the {{source-record-write-*}} metrics
> _includes_ records filtered out by transformations (and also records that
> result in produce failures with the config {{{}errors.tolerance=all{}}}).
> h3. Details
> In
> [AbstractWorkerSourceTask.java|https://github.com/apache/kafka/blob/a382acd31d1b53cd8695ff9488977566083540b1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L389-L397],
> each source record is passed through the transformation chain where it is
> potentially filtered out, checked to see if it was in fact filtered out, and
> if so it is accounted for in the internal metrics via
> {{{}counter.skipRecord(){}}}.
> {code:java}
> for (final SourceRecord preTransformRecord : toSend) {
> retryWithToleranceOperator.sourceRecord(preTransformRecord);
> final SourceRecord record =
> transformationChain.apply(preTransformRecord);
> final ProducerRecord<byte[], byte[]> producerRecord =
> convertTransformedRecord(record);
> if (producerRecord == null || retryWithToleranceOperator.failed()) {
>
> counter.skipRecord();
> recordDropped(preTransformRecord);
> continue;
> }
> ...
> {code}
> {{SourceRecordWriteCounter.skipRecord()}} is implemented as follows:
> {code:java}
> ....
> public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup
> metricsGroup) {
> assert batchSize > 0;
> assert metricsGroup != null;
> this.batchSize = batchSize;
> counter = batchSize;
> this.metricsGroup = metricsGroup;
> }
> public void skipRecord() {
> if (counter > 0 && --counter == 0) {
> finishedAllWrites();
> }
> }
> ....
> private void finishedAllWrites() {
> if (!completed) {
> metricsGroup.recordWrite(batchSize - counter);
> completed = true;
> }
> }
> {code}
> For example: If a batch starts with 100 records, {{batchSize}} and
> {{counter}} will both be initialized to 100. If all 100 records get filtered
> out, {{counter}} will be decremented 100 times, and
> {{{}finishedAllWrites(){}}}will record the value 100 to the underlying
> {{source-record-write-*}} metrics rather than 0, the correct value according
> to the documentation for these metrics.
> h3. Solutions
> Assuming the documentation correctly captures the intent of the
> {{source-record-write-*}} metrics, it seems reasonable to fix these metrics
> such that filtered records do not get counted.
> It may also be useful to add additional metrics to capture the rate and total
> number of records filtered out by transformations, which would require a KIP.
> I'm not sure what the best way of accounting for produce failures in the case
> of {{errors.tolerance=all}} is yet. Maybe these failures deserve their own
> new metrics?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)