[ 
https://issues.apache.org/jira/browse/KAFKA-14081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17568387#comment-17568387
 ] 

Gian Luca commented on KAFKA-14081:
-----------------------------------

This is my report implementation:
{code:java}
package kpm;

import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;

import java.util.List;
import java.util.Map;

public class TestMetricsReporter implements MetricsReporter {

    public TestMetricsReporter() {
        System.out.println("creation");
    }

    @Override
    public void init(List<KafkaMetric> list) {
        System.out.println("init");
        dump(list);
    }

    @Override
    public void metricChange(KafkaMetric kafkaMetric) {
        if (kafkaMetric.metricName().name().equals("request-total")) {
            System.out.println("metric change");
            dump(kafkaMetric);
        }
    }

    @Override
    public void metricRemoval(KafkaMetric kafkaMetric) {

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }

    private void dump(List<KafkaMetric> list) {
        for (KafkaMetric m: list) {
            dump(m);
        }
    }

    private void dump(KafkaMetric m) {
        System.out.printf("%s/%s -> %s\n", m.metricName().name(), 
m.metricName().tags(), m.metricValue());
    }
} {code}
And this is the main class:
{code:java}
package kpm;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.Future;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        Serializer<Void> keySerializer = (s, o) -> new byte[0];
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("topic", "source");
        properties.setProperty("metric.reporters", 
TestMetricsReporter.class.getCanonicalName());

        KafkaProducer<Void, String> producer = new KafkaProducer<>(
                properties,
                keySerializer,
                new StringSerializer());

        for (int i = 0; i < 1000; i++) {
            Future f = producer.send(
                    new ProducerRecord("source", "{}"),
                    (recordMetadata, e) -> {
                        if (e == null)
                            System.out.println("SUCCESS");
                        else
                            System.out.println("FAIL");
                    }
            );
            Thread.sleep(1000);
        }

        Thread.sleep(5000);
        System.out.println("Closing...");
        producer.close();
    }
} {code}
On execution, the values of the 'request-total' metric are notified once 
(through the metricChange() method) with value 0.0, then no more updates happen.

> Cannot get my MetricsReporter implementation to receive meaningful metrics
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-14081
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14081
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 3.3.0
>            Reporter: Gian Luca
>            Priority: Minor
>
> I want to extract metrics from KafkaProducer to export them to our company 
> monitoring solution. At first I went for implementing {{MetricsReporter}} and 
> registering my implementation through the "metric.reporters" config property. 
> The class is correctly registered as it receives metric updates through 
> {{metricChange()}} while KafkaProducer is being used. The problem is that all 
> the metric values are stuck at zero (NaN in older versions of Kafka), even 
> the most trivial (e.g. 'record-send-total').
> If instead of using a report I simply poll the {{metrics()}} method of the 
> KafkaProducer, then I see meaningful values: counters increasing over time, 
> etc.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to