Chris Beard created KAFKA-14659:
-----------------------------------
Summary: source-record-write-[rate|total] metrics include filtered
records
Key: KAFKA-14659
URL: https://issues.apache.org/jira/browse/KAFKA-14659
Project: Kafka
Issue Type: Bug
Components: KafkaConnect
Reporter: Chris Beard
Source tasks in Kafka connect offer two sets of metrics (documented in
[ConnectMetricsRegistry.java|https://github.com/apache/kafka/blob/72cfc994f5675be349d4494ece3528efed290651/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java#L173-L191]):
||Metric||Description||
|source-record-poll-rate|The average per-second number of records
produced/polled (before transformation) by this task belonging to the named
source connector in this worker.|
|source-record-write-rate|The average per-second number of records output from
the transformations and written to Kafka for this task belonging to the named
source connector in this worker. This is after transformations are applied and
excludes any records filtered out by the transformations.|
There are also corresponding "-total" metrics that capture the total number of
records polled and written for the metrics above, respectively.
In short, the "poll" metrics capture the number of messages sourced
pre-transformation/filtering, and the "write" metrics should capture the number
of messages ultimately written to Kafka post-transformation/filtering. However,
the implementation of the {{source-record-write-*}} metrics _includes_ records
filtered out by transformations (and also records that result in produce
failures with the config {{{}errors.tolerance=all{}}}).
h3. Details
In
[AbstractWorkerSourceTask.java|https://github.com/apache/kafka/blob/a382acd31d1b53cd8695ff9488977566083540b1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L389-L397],
each source record is passed through the transformation chain where it is
potentially filtered out, checked to see if it was in fact filtered out, and if
so it is accounted for in the internal metrics via {{{}counter.skipRecord(){}}}.
{code:java}
for (final SourceRecord preTransformRecord : toSend) {
retryWithToleranceOperator.sourceRecord(preTransformRecord);
final SourceRecord record = transformationChain.apply(preTransformRecord);
final ProducerRecord<byte[], byte[]> producerRecord =
convertTransformedRecord(record);
if (producerRecord == null || retryWithToleranceOperator.failed()) {
counter.skipRecord();
recordDropped(preTransformRecord);
continue;
}
...
{code}
{{SourceRecordWriteCounter.skipRecord()}} is implemented as follows:
{code:java}
....
public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup
metricsGroup) {
assert batchSize > 0;
assert metricsGroup != null;
this.batchSize = batchSize;
counter = batchSize;
this.metricsGroup = metricsGroup;
}
public void skipRecord() {
if (counter > 0 && --counter == 0) {
finishedAllWrites();
}
}
....
private void finishedAllWrites() {
if (!completed) {
metricsGroup.recordWrite(batchSize - counter);
completed = true;
}
}
{code}
For example: If a batch starts with 100 records, {{batchSize}} and {{counter}}
will both be initialized to 100. If all 100 records get filtered out,
{{counter}} will be decremented 100 times, and {{{}finishedAllWrites(){}}}will
record the value 100 to the underlying {{source-record-write-*}} metrics
rather than 0, the correct value according to the documentation for these
metrics.
h3. Solutions
Assuming the documentation correctly captures the intent of the
{{source-record-write-*}} metrics, it seems reasonable to fix these metrics
such that filtered records do not get counted.
It may also be useful to add additional metrics to capture the rate and total
number of records filtered out by transformations, which would require a KIP.
I'm not sure what the best way of accounting for produce failures in the case
of {{errors.tolerance=all}} is yet. Maybe these failures deserve their own new
metrics?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)