[jira] [Updated] (KAFKA-14659) source-record-write-[rate|total] metrics include filtered records

2023-02-28 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14659:
--
Fix Version/s: 3.3.3

> source-record-write-[rate|total] metrics include filtered records
> -
>
> Key: KAFKA-14659
> URL: https://issues.apache.org/jira/browse/KAFKA-14659
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Beard
>Assignee: Hector Geraldino
>Priority: Minor
> Fix For: 3.5.0, 3.4.1, 3.3.3
>
>
> Source tasks in Kafka connect offer two sets of metrics (documented in 
> [ConnectMetricsRegistry.java|https://github.com/apache/kafka/blob/72cfc994f5675be349d4494ece3528efed290651/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java#L173-L191]):
> ||Metric||Description||
> |source-record-poll-rate|The average per-second number of records 
> produced/polled (before transformation) by this task belonging to the named 
> source connector in this worker.|
> |source-record-write-rate|The average per-second number of records output 
> from the transformations and written to Kafka for this task belonging to the 
> named source connector in this worker. This is after transformations are 
> applied and excludes any records filtered out by the transformations.|
> There are also corresponding "-total" metrics that capture the total number 
> of records polled and written for the metrics above, respectively.
> In short, the "poll" metrics capture the number of messages sourced 
> pre-transformation/filtering, and the "write" metrics should capture the 
> number of messages ultimately written to Kafka post-transformation/filtering. 
> However, the implementation of the {{source-record-write-*}}  metrics 
> _includes_ records filtered out by transformations (and also records that 
> result in produce failures with the config {{{}errors.tolerance=all{}}}).
> h3. Details
> In 
> [AbstractWorkerSourceTask.java|https://github.com/apache/kafka/blob/a382acd31d1b53cd8695ff9488977566083540b1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L389-L397],
>  each source record is passed through the transformation chain where it is 
> potentially filtered out, checked to see if it was in fact filtered out, and 
> if so it is accounted for in the internal metrics via 
> {{{}counter.skipRecord(){}}}.
> {code:java}
> for (final SourceRecord preTransformRecord : toSend) { 
> retryWithToleranceOperator.sourceRecord(preTransformRecord);
> final SourceRecord record = 
> transformationChain.apply(preTransformRecord);
> final ProducerRecord producerRecord = 
> convertTransformedRecord(record);
> if (producerRecord == null || retryWithToleranceOperator.failed()) {  
>   
> counter.skipRecord();
> recordDropped(preTransformRecord);
> continue;
> }
> ...
> {code}
> {{SourceRecordWriteCounter.skipRecord()}} is implemented as follows:
> {code:java}
> 
> public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup 
> metricsGroup) {
> assert batchSize > 0;
> assert metricsGroup != null;
> this.batchSize = batchSize;
> counter = batchSize;
> this.metricsGroup = metricsGroup;
> }
> public void skipRecord() {
> if (counter > 0 && --counter == 0) {
> finishedAllWrites();
> }
> }
> 
> private void finishedAllWrites() {
> if (!completed) {
> metricsGroup.recordWrite(batchSize - counter);
> completed = true;
> }
> }
> {code}
> For example: If a batch starts with 100 records, {{batchSize}} and 
> {{counter}} will both be initialized to 100. If all 100 records get filtered 
> out, {{counter}} will be decremented 100 times, and 
> {{{}finishedAllWrites(){}}}will record the value 100 to the underlying 
> {{source-record-write-*}}  metrics rather than 0, the correct value according 
> to the documentation for these metrics.
> h3. Solutions
> Assuming the documentation correctly captures the intent of the 
> {{source-record-write-*}}  metrics, it seems reasonable to fix these metrics 
> such that filtered records do not get counted.
> It may also be useful to add additional metrics to capture the rate and total 
> number of records filtered out by transformations, which would require a KIP.
> I'm not sure what the best way of accounting for produce failures in the case 
> of {{errors.tolerance=all}} is yet. Maybe these failures deserve their own 
> new metrics?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14659) source-record-write-[rate|total] metrics include filtered records

2023-02-28 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14659:
--
Fix Version/s: 3.4.1

> source-record-write-[rate|total] metrics include filtered records
> -
>
> Key: KAFKA-14659
> URL: https://issues.apache.org/jira/browse/KAFKA-14659
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Beard
>Assignee: Hector Geraldino
>Priority: Minor
> Fix For: 3.5.0, 3.4.1
>
>
> Source tasks in Kafka connect offer two sets of metrics (documented in 
> [ConnectMetricsRegistry.java|https://github.com/apache/kafka/blob/72cfc994f5675be349d4494ece3528efed290651/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java#L173-L191]):
> ||Metric||Description||
> |source-record-poll-rate|The average per-second number of records 
> produced/polled (before transformation) by this task belonging to the named 
> source connector in this worker.|
> |source-record-write-rate|The average per-second number of records output 
> from the transformations and written to Kafka for this task belonging to the 
> named source connector in this worker. This is after transformations are 
> applied and excludes any records filtered out by the transformations.|
> There are also corresponding "-total" metrics that capture the total number 
> of records polled and written for the metrics above, respectively.
> In short, the "poll" metrics capture the number of messages sourced 
> pre-transformation/filtering, and the "write" metrics should capture the 
> number of messages ultimately written to Kafka post-transformation/filtering. 
> However, the implementation of the {{source-record-write-*}}  metrics 
> _includes_ records filtered out by transformations (and also records that 
> result in produce failures with the config {{{}errors.tolerance=all{}}}).
> h3. Details
> In 
> [AbstractWorkerSourceTask.java|https://github.com/apache/kafka/blob/a382acd31d1b53cd8695ff9488977566083540b1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L389-L397],
>  each source record is passed through the transformation chain where it is 
> potentially filtered out, checked to see if it was in fact filtered out, and 
> if so it is accounted for in the internal metrics via 
> {{{}counter.skipRecord(){}}}.
> {code:java}
> for (final SourceRecord preTransformRecord : toSend) { 
> retryWithToleranceOperator.sourceRecord(preTransformRecord);
> final SourceRecord record = 
> transformationChain.apply(preTransformRecord);
> final ProducerRecord producerRecord = 
> convertTransformedRecord(record);
> if (producerRecord == null || retryWithToleranceOperator.failed()) {  
>   
> counter.skipRecord();
> recordDropped(preTransformRecord);
> continue;
> }
> ...
> {code}
> {{SourceRecordWriteCounter.skipRecord()}} is implemented as follows:
> {code:java}
> 
> public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup 
> metricsGroup) {
> assert batchSize > 0;
> assert metricsGroup != null;
> this.batchSize = batchSize;
> counter = batchSize;
> this.metricsGroup = metricsGroup;
> }
> public void skipRecord() {
> if (counter > 0 && --counter == 0) {
> finishedAllWrites();
> }
> }
> 
> private void finishedAllWrites() {
> if (!completed) {
> metricsGroup.recordWrite(batchSize - counter);
> completed = true;
> }
> }
> {code}
> For example: If a batch starts with 100 records, {{batchSize}} and 
> {{counter}} will both be initialized to 100. If all 100 records get filtered 
> out, {{counter}} will be decremented 100 times, and 
> {{{}finishedAllWrites(){}}}will record the value 100 to the underlying 
> {{source-record-write-*}}  metrics rather than 0, the correct value according 
> to the documentation for these metrics.
> h3. Solutions
> Assuming the documentation correctly captures the intent of the 
> {{source-record-write-*}}  metrics, it seems reasonable to fix these metrics 
> such that filtered records do not get counted.
> It may also be useful to add additional metrics to capture the rate and total 
> number of records filtered out by transformations, which would require a KIP.
> I'm not sure what the best way of accounting for produce failures in the case 
> of {{errors.tolerance=all}} is yet. Maybe these failures deserve their own 
> new metrics?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14659) source-record-write-[rate|total] metrics include filtered records

2023-02-28 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-14659:
--
Fix Version/s: 3.5.0

> source-record-write-[rate|total] metrics include filtered records
> -
>
> Key: KAFKA-14659
> URL: https://issues.apache.org/jira/browse/KAFKA-14659
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Beard
>Assignee: Hector Geraldino
>Priority: Minor
> Fix For: 3.5.0
>
>
> Source tasks in Kafka connect offer two sets of metrics (documented in 
> [ConnectMetricsRegistry.java|https://github.com/apache/kafka/blob/72cfc994f5675be349d4494ece3528efed290651/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java#L173-L191]):
> ||Metric||Description||
> |source-record-poll-rate|The average per-second number of records 
> produced/polled (before transformation) by this task belonging to the named 
> source connector in this worker.|
> |source-record-write-rate|The average per-second number of records output 
> from the transformations and written to Kafka for this task belonging to the 
> named source connector in this worker. This is after transformations are 
> applied and excludes any records filtered out by the transformations.|
> There are also corresponding "-total" metrics that capture the total number 
> of records polled and written for the metrics above, respectively.
> In short, the "poll" metrics capture the number of messages sourced 
> pre-transformation/filtering, and the "write" metrics should capture the 
> number of messages ultimately written to Kafka post-transformation/filtering. 
> However, the implementation of the {{source-record-write-*}}  metrics 
> _includes_ records filtered out by transformations (and also records that 
> result in produce failures with the config {{{}errors.tolerance=all{}}}).
> h3. Details
> In 
> [AbstractWorkerSourceTask.java|https://github.com/apache/kafka/blob/a382acd31d1b53cd8695ff9488977566083540b1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L389-L397],
>  each source record is passed through the transformation chain where it is 
> potentially filtered out, checked to see if it was in fact filtered out, and 
> if so it is accounted for in the internal metrics via 
> {{{}counter.skipRecord(){}}}.
> {code:java}
> for (final SourceRecord preTransformRecord : toSend) { 
> retryWithToleranceOperator.sourceRecord(preTransformRecord);
> final SourceRecord record = 
> transformationChain.apply(preTransformRecord);
> final ProducerRecord producerRecord = 
> convertTransformedRecord(record);
> if (producerRecord == null || retryWithToleranceOperator.failed()) {  
>   
> counter.skipRecord();
> recordDropped(preTransformRecord);
> continue;
> }
> ...
> {code}
> {{SourceRecordWriteCounter.skipRecord()}} is implemented as follows:
> {code:java}
> 
> public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup 
> metricsGroup) {
> assert batchSize > 0;
> assert metricsGroup != null;
> this.batchSize = batchSize;
> counter = batchSize;
> this.metricsGroup = metricsGroup;
> }
> public void skipRecord() {
> if (counter > 0 && --counter == 0) {
> finishedAllWrites();
> }
> }
> 
> private void finishedAllWrites() {
> if (!completed) {
> metricsGroup.recordWrite(batchSize - counter);
> completed = true;
> }
> }
> {code}
> For example: If a batch starts with 100 records, {{batchSize}} and 
> {{counter}} will both be initialized to 100. If all 100 records get filtered 
> out, {{counter}} will be decremented 100 times, and 
> {{{}finishedAllWrites(){}}}will record the value 100 to the underlying 
> {{source-record-write-*}}  metrics rather than 0, the correct value according 
> to the documentation for these metrics.
> h3. Solutions
> Assuming the documentation correctly captures the intent of the 
> {{source-record-write-*}}  metrics, it seems reasonable to fix these metrics 
> such that filtered records do not get counted.
> It may also be useful to add additional metrics to capture the rate and total 
> number of records filtered out by transformations, which would require a KIP.
> I'm not sure what the best way of accounting for produce failures in the case 
> of {{errors.tolerance=all}} is yet. Maybe these failures deserve their own 
> new metrics?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14659) source-record-write-[rate|total] metrics include filtered records

2023-01-27 Thread Chris Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Beard updated KAFKA-14659:

Priority: Minor  (was: Major)

> source-record-write-[rate|total] metrics include filtered records
> -
>
> Key: KAFKA-14659
> URL: https://issues.apache.org/jira/browse/KAFKA-14659
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Beard
>Priority: Minor
>
> Source tasks in Kafka connect offer two sets of metrics (documented in 
> [ConnectMetricsRegistry.java|https://github.com/apache/kafka/blob/72cfc994f5675be349d4494ece3528efed290651/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java#L173-L191]):
> ||Metric||Description||
> |source-record-poll-rate|The average per-second number of records 
> produced/polled (before transformation) by this task belonging to the named 
> source connector in this worker.|
> |source-record-write-rate|The average per-second number of records output 
> from the transformations and written to Kafka for this task belonging to the 
> named source connector in this worker. This is after transformations are 
> applied and excludes any records filtered out by the transformations.|
> There are also corresponding "-total" metrics that capture the total number 
> of records polled and written for the metrics above, respectively.
> In short, the "poll" metrics capture the number of messages sourced 
> pre-transformation/filtering, and the "write" metrics should capture the 
> number of messages ultimately written to Kafka post-transformation/filtering. 
> However, the implementation of the {{source-record-write-*}}  metrics 
> _includes_ records filtered out by transformations (and also records that 
> result in produce failures with the config {{{}errors.tolerance=all{}}}).
> h3. Details
> In 
> [AbstractWorkerSourceTask.java|https://github.com/apache/kafka/blob/a382acd31d1b53cd8695ff9488977566083540b1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L389-L397],
>  each source record is passed through the transformation chain where it is 
> potentially filtered out, checked to see if it was in fact filtered out, and 
> if so it is accounted for in the internal metrics via 
> {{{}counter.skipRecord(){}}}.
> {code:java}
> for (final SourceRecord preTransformRecord : toSend) { 
> retryWithToleranceOperator.sourceRecord(preTransformRecord);
> final SourceRecord record = 
> transformationChain.apply(preTransformRecord);
> final ProducerRecord producerRecord = 
> convertTransformedRecord(record);
> if (producerRecord == null || retryWithToleranceOperator.failed()) {  
>   
> counter.skipRecord();
> recordDropped(preTransformRecord);
> continue;
> }
> ...
> {code}
> {{SourceRecordWriteCounter.skipRecord()}} is implemented as follows:
> {code:java}
> 
> public SourceRecordWriteCounter(int batchSize, SourceTaskMetricsGroup 
> metricsGroup) {
> assert batchSize > 0;
> assert metricsGroup != null;
> this.batchSize = batchSize;
> counter = batchSize;
> this.metricsGroup = metricsGroup;
> }
> public void skipRecord() {
> if (counter > 0 && --counter == 0) {
> finishedAllWrites();
> }
> }
> 
> private void finishedAllWrites() {
> if (!completed) {
> metricsGroup.recordWrite(batchSize - counter);
> completed = true;
> }
> }
> {code}
> For example: If a batch starts with 100 records, {{batchSize}} and 
> {{counter}} will both be initialized to 100. If all 100 records get filtered 
> out, {{counter}} will be decremented 100 times, and 
> {{{}finishedAllWrites(){}}}will record the value 100 to the underlying 
> {{source-record-write-*}}  metrics rather than 0, the correct value according 
> to the documentation for these metrics.
> h3. Solutions
> Assuming the documentation correctly captures the intent of the 
> {{source-record-write-*}}  metrics, it seems reasonable to fix these metrics 
> such that filtered records do not get counted.
> It may also be useful to add additional metrics to capture the rate and total 
> number of records filtered out by transformations, which would require a KIP.
> I'm not sure what the best way of accounting for produce failures in the case 
> of {{errors.tolerance=all}} is yet. Maybe these failures deserve their own 
> new metrics?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)