Hello,

yes, Flink can handle custom metrics. It is quite odd that you can see the system metrics but not your
own; I don't see a problem with your code.

How long is the job running? Since you create a metric for the flatMap operation the metric will only be exposed as long as the flatMap operation is active. Thus, if this operation takes less than 10 seconds
(the default interval for all reporters) then it may never report it.

Regards,
Chesnay

On 13.03.2017 08:48, Bowen Li wrote:
Hi guys,
     I'm retrying to send some app related custom metrics from Flink to
Datadog via StatsD.

     I followed https://ci.apache.org/projects/flink/flink-docs-
release-1.2/monitoring/metrics.html to set up flink-conf.yaml and test code
like this

     // flink-conf.yaml

metrics.reporters: stsd
metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.stsd.host: localhost
metrics.reporter.stsd.port: 8125

metrics.scope.jm: bowen.jobmanager
metrics.scope.jm.job: bowen.jobmanager.bowen.job_name
metrics.scope.tm: bowen.taskmanager.<tm_id>

metrics.scope.tm.job: bowen.taskmanager.<tm_id>.<job_name>
metrics.scope.task: bowen.taskmanager.<tm_id>.<job_name>.<task_name>.<
subtask_index>
metrics.scope.operator: bowen.taskmanager.<tm_id>.<
job_name>.<operator_name>.<subtask_index>


       // test code, by modifying WordCount example


public static final class Tokenizer extends
RichFlatMapFunction<String, Tuple2<String, Integer>> {
    private static final long serialVersionUID = 1L;

    private Counter counter;

    @Override
    public void open(Configuration config) {
       getRuntimeContext()
          .getMetricGroup()
          .addGroup("bowen.test")
          .gauge("bowen.test.flink", new Gauge<Integer>() {
             @Override
             public Integer getValue() {
                return 100;
             }
          });  // test custom metrics

       counter = getRuntimeContext()
          .getMetricGroup()
          .addGroup("bowen.test")
          .counter("bowen.flink.metric"); // test custom metrics
       counter.inc(100);
    }

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
          throws Exception {
       // normalize and split the line
       String[] tokens = value.toLowerCase().split("\\W+");

       // emit the pairs
       for (String token : tokens) {
          if (token.length() > 0) {
             out.collect(new Tuple2<String, Integer>(token, 1));
          }
       }

       counter.inc(100);
    }
}

       I found my Datadog received all system scope metrics, but non of my
custom metric. I researched all night but gained no progress. What did I do
wrong? Flink is able to handle custom metrics right? I'd really appreciate
some guidance on sending custom metrics!

      Thank you very much!
Bowen


Reply via email to