Repository: flink
Updated Branches:
  refs/heads/master 6226108e8 -> 8195001bb


[FLINK-3950] Add Meter interface

This closes #2374


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8195001b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8195001b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8195001b

Branch: refs/heads/master
Commit: 8195001bbd54ca375b3f1e4e6f65974f061cdd44
Parents: 6226108
Author: Ivan Mushketyk <ivan.mushke...@gmail.com>
Authored: Mon Aug 15 08:02:57 2016 +0100
Committer: zentol <ches...@apache.org>
Committed: Mon Aug 29 18:48:06 2016 +0200

----------------------------------------------------------------------
 docs/monitoring/metrics.md                      | 51 ++++++++++++-
 .../java/org/apache/flink/metrics/Meter.java    | 51 +++++++++++++
 .../org/apache/flink/metrics/MetricGroup.java   | 20 +++++
 .../groups/UnregisteredMetricsGroup.java        | 11 +++
 .../metrics/reporter/AbstractReporter.java      |  6 ++
 .../apache/flink/metrics/util/TestMeter.java    | 42 +++++++++++
 .../dropwizard/ScheduledDropwizardReporter.java | 19 ++++-
 .../metrics/DropwizardHistogramWrapper.java     | 14 ++--
 .../metrics/DropwizardMeterWrapper.java         | 57 +++++++++++++++
 .../dropwizard/metrics/FlinkMeterWrapper.java   | 77 ++++++++++++++++++++
 .../ScheduledDropwizardReporterTest.java        |  9 ++-
 .../metrics/DropwizardMeterWrapperTest.java     | 59 +++++++++++++++
 .../metrics/FlinkMeterWrapperTest.java          | 64 ++++++++++++++++
 .../apache/flink/metrics/jmx/JMXReporter.java   | 28 +++++++
 .../flink/metrics/jmx/JMXReporterTest.java      | 42 +++++++++++
 .../flink/metrics/statsd/StatsDReporter.java    | 12 +++
 .../metrics/statsd/StatsDReporterTest.java      | 63 ++++++++++++++++
 .../metrics/groups/AbstractMetricGroup.java     | 12 +++
 .../metrics/groups/ProxyMetricGroup.java        | 11 +++
 19 files changed, 638 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8195001b/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 3a148e1..0e51407 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -34,7 +34,7 @@ This method returns a `MetricGroup` object on which you can 
create and register
 
 ### Metric types
 
-Flink supports `Counters`, `Gauges` and `Histograms`.
+Flink supports `Counters`, `Gauges`, `Histograms` and `Meters`.
 
 #### Counter
 
@@ -155,6 +155,55 @@ public class MyMapper extends RichMapFunction<Long, 
Integer> {
 }
 {% endhighlight %}
 
+#### Meter
+
+A `Meter` measures an average throughput. An occurrence of an event can be 
registered with the `markEvent()` method. Occurrence of multiple events at the 
same time can be registered with `markEvent(long n)` method.
+You can register a meter by calling `meter(String name, Meter meter)` on a 
`MetricGroup`.
+
+{% highlight java %}
+public class MyMapper extends RichMapFunction<Long, Integer> {
+  private Meter meter;
+
+  @Override
+  public void open(Configuration config) {
+    this.meter = getRuntimeContext()
+      .getMetricGroup()
+      .meter("myMeter", new MyMeter());
+  }
+
+  @public Integer map(Long value) throws Exception {
+    this.meter.markEvent();
+  }
+}
+{% endhighlight %}
+
+Flink offers a {% gh_link 
flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapper.java
 "Wrapper" %} that allows usage of Codahale/DropWizard meters.
+To use this wrapper add the following dependency in your `pom.xml`:
+{% highlight xml %}
+<dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-metrics-dropwizard</artifactId>
+      <version>{{site.version}}</version>
+</dependency>
+{% endhighlight %}
+
+You can then register a Codahale/DropWizard meter like this:
+
+{% highlight java %}
+public class MyMapper extends RichMapFunction<Long, Integer> {
+  private Meter meter;
+
+  @Override
+  public void open(Configuration config) {
+    com.codahale.metrics.Meter meter = new com.codahale.metrics.Meter();
+
+    this.meter = getRuntimeContext()
+      .getMetricGroup()
+      .meter("myMeter", new DropWizardMeterWrapper(meter));
+  }
+}
+{% endhighlight %}
+
 ## Scope
 
 Every metric is assigned an identifier under which it will be reported that is 
based on 3 components: the user-provided name when registering the metric, an 
optional user-defined scope and a system-provided scope.

http://git-wip-us.apache.org/repos/asf/flink/blob/8195001b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Meter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Meter.java
 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Meter.java
new file mode 100644
index 0000000..f9cf742
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/Meter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics;
+
+/**
+ * Metric for measuring throughput.
+ */
+public interface Meter extends Metric {
+
+       /**
+        * Mark occurrence of an event.
+        */
+       void markEvent();
+
+       /**
+        * Mark occurrence of multiple events.
+        *
+        * @param n number of events occurred
+        */
+       void markEvent(long n);
+
+       /**
+        * Returns the current rate of events per second.
+        *
+        * @return current rate of events per second
+        */
+       double getRate();
+
+       /**
+        * Get number of events marked on the meter.
+        *
+        * @return number of events marked on the meter
+        */
+       long getCount();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8195001b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
index 95387ae..d4221ef 100644
--- 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
+++ 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/MetricGroup.java
@@ -110,6 +110,26 @@ public interface MetricGroup {
         */
        <H extends Histogram> H histogram(int name, H histogram);
 
+       /**
+        * Registers a new {@link Meter} with Flink.
+        *
+        * @param name name of the meter
+        * @param meter meter to register
+        * @param <M> meter type
+        * @return the registered meter
+        */
+       <M extends Meter> M meter(String name, M meter);
+
+       /**
+        * Registers a new {@link Meter} with Flink.
+        *
+        * @param name name of the meter
+        * @param meter meter to register
+        * @param <M> meter type
+        * @return the registered meter
+        */
+       <M extends Meter> M meter(int name, M meter);
+
        // 
------------------------------------------------------------------------
        // Groups
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/8195001b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
index 3bbd0f6..ea11b43 100644
--- 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
+++ 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
@@ -22,6 +22,7 @@ import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
 
@@ -70,6 +71,16 @@ public class UnregisteredMetricsGroup implements MetricGroup 
{
        }
 
        @Override
+       public <M extends Meter> M meter(String name, M meter) {
+               return meter;
+       }
+
+       @Override
+       public <M extends Meter> M meter(int name, M meter) {
+               return meter;
+       }
+
+       @Override
        public <H extends Histogram> H histogram(String name, H histogram) {
                return histogram;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8195001b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
index 7ab8c73..0c8d9ad 100644
--- 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
+++ 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/AbstractReporter.java
@@ -22,6 +22,7 @@ import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.slf4j.Logger;
@@ -39,6 +40,7 @@ public abstract class AbstractReporter implements 
MetricReporter, CharacterFilte
        protected final Map<Gauge<?>, String> gauges = new HashMap<>();
        protected final Map<Counter, String> counters = new HashMap<>();
        protected final Map<Histogram, String> histograms = new HashMap<>();
+       protected final Map<Meter, String> meters = new HashMap<>();
 
        @Override
        public void notifyOfAddedMetric(Metric metric, String metricName, 
MetricGroup group) {
@@ -51,6 +53,8 @@ public abstract class AbstractReporter implements 
MetricReporter, CharacterFilte
                                gauges.put((Gauge<?>) metric, name);
                        } else if (metric instanceof Histogram) {
                                histograms.put((Histogram) metric, name);
+                       } else if (metric instanceof Meter) {
+                               meters.put((Meter) metric, name);
                        } else {
                                log.warn("Cannot add unknown metric type {}. 
This indicates that the reporter " +
                                        "does not support this metric type.", 
metric.getClass().getName());
@@ -67,6 +71,8 @@ public abstract class AbstractReporter implements 
MetricReporter, CharacterFilte
                                gauges.remove(metric);
                        } else if (metric instanceof Histogram) {
                                histograms.remove(metric);
+                       } else if (metric instanceof Meter) {
+                               meters.remove(metric);
                        } else {
                                log.warn("Cannot remove unknown metric type {}. 
This indicates that the reporter " +
                                        "does not support this metric type.", 
metric.getClass().getName());

http://git-wip-us.apache.org/repos/asf/flink/blob/8195001b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/util/TestMeter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/util/TestMeter.java
 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/util/TestMeter.java
new file mode 100644
index 0000000..b44b996
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/util/TestMeter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.util;
+
+import org.apache.flink.metrics.Meter;
+
+public class TestMeter implements Meter {
+
+       @Override
+       public void markEvent() {
+       }
+
+       @Override
+       public void markEvent(long n) {
+       }
+
+       @Override
+       public double getRate() {
+               return 5;
+       }
+
+       @Override
+       public long getCount() {
+               return 100L;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8195001b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
index ce0299b..b7e83b6 100644
--- 
a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/ScheduledDropwizardReporter.java
@@ -23,14 +23,17 @@ import com.codahale.metrics.Reporter;
 import com.codahale.metrics.ScheduledReporter;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
 import org.apache.flink.dropwizard.metrics.FlinkCounterWrapper;
 import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
 import org.apache.flink.dropwizard.metrics.FlinkGaugeWrapper;
 import org.apache.flink.dropwizard.metrics.FlinkHistogramWrapper;
+import org.apache.flink.dropwizard.metrics.FlinkMeterWrapper;
 import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.MetricGroup;
@@ -67,6 +70,7 @@ public abstract class ScheduledDropwizardReporter implements 
MetricReporter, Sch
        private final Map<Gauge<?>, String> gauges = new HashMap<>();
        private final Map<Counter, String> counters = new HashMap<>();
        private final Map<Histogram, String> histograms = new HashMap<>();
+       private final Map<Meter, String> meters = new HashMap<>();
 
        // 
------------------------------------------------------------------------
 
@@ -83,6 +87,10 @@ public abstract class ScheduledDropwizardReporter implements 
MetricReporter, Sch
                return counters;
        }
 
+       Map<Meter, String> getMeters() {
+               return meters;
+       }
+
        // 
------------------------------------------------------------------------
        //  life cycle
        // 
------------------------------------------------------------------------
@@ -118,10 +126,19 @@ public abstract class ScheduledDropwizardReporter 
implements MetricReporter, Sch
                                histograms.put(histogram, fullName);
 
                                if (histogram instanceof 
DropwizardHistogramWrapper) {
-                                       registry.register(fullName, 
((DropwizardHistogramWrapper) histogram).getDropwizarHistogram());
+                                       registry.register(fullName, 
((DropwizardHistogramWrapper) histogram).getDropwizardHistogram());
                                } else {
                                        registry.register(fullName, new 
FlinkHistogramWrapper(histogram));
                                }
+                       } else if (metric instanceof Meter) {
+                               Meter meter = (Meter) metric;
+                               meters.put(meter, fullName);
+
+                               if (meter instanceof DropwizardMeterWrapper) {
+                                       registry.register(fullName, 
((DropwizardMeterWrapper) meter).getDropwizardMeter());
+                               } else {
+                                       registry.register(fullName, new 
FlinkMeterWrapper(meter));
+                               }
                        } else {
                                log.warn("Cannot add metric of type {}. This 
indicates that the reporter " +
                                        "does not support this metric type.", 
metric.getClass().getName());

http://git-wip-us.apache.org/repos/asf/flink/blob/8195001b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java
 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java
index 79a6a56..25f7701 100644
--- 
a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardHistogramWrapper.java
@@ -26,28 +26,28 @@ import org.apache.flink.metrics.HistogramStatistics;
  */
 public class DropwizardHistogramWrapper implements Histogram {
 
-       private final com.codahale.metrics.Histogram dropwizarHistogram;
+       private final com.codahale.metrics.Histogram dropwizardHistogram;
 
        public DropwizardHistogramWrapper(com.codahale.metrics.Histogram 
dropwizardHistogram) {
-               this.dropwizarHistogram = dropwizardHistogram;
+               this.dropwizardHistogram = dropwizardHistogram;
        }
 
-       public com.codahale.metrics.Histogram getDropwizarHistogram() {
-               return dropwizarHistogram;
+       public com.codahale.metrics.Histogram getDropwizardHistogram() {
+               return dropwizardHistogram;
        }
 
        @Override
        public void update(long value) {
-               dropwizarHistogram.update(value);
+               dropwizardHistogram.update(value);
        }
 
        @Override
        public long getCount() {
-               return dropwizarHistogram.getCount();
+               return dropwizardHistogram.getCount();
        }
 
        @Override
        public HistogramStatistics getStatistics() {
-               return new 
DropwizardHistogramStatistics(dropwizarHistogram.getSnapshot());
+               return new 
DropwizardHistogramStatistics(dropwizardHistogram.getSnapshot());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8195001b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapper.java
 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapper.java
new file mode 100644
index 0000000..4f6fefe
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapper.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dropwizard.metrics;
+
+import org.apache.flink.metrics.Meter;
+
+/**
+ * Wrapper to use a Dropwizard {@link com.codahale.metrics.Meter} as a Flink 
{@link Meter}.
+ */
+public class DropwizardMeterWrapper implements Meter {
+
+       private final com.codahale.metrics.Meter meter;
+
+       public DropwizardMeterWrapper(com.codahale.metrics.Meter meter) {
+               this.meter = meter;
+       }
+
+       public com.codahale.metrics.Meter getDropwizardMeter() {
+               return meter;
+       }
+
+       @Override
+       public void markEvent() {
+               meter.mark();
+       }
+
+       @Override
+       public void markEvent(long n) {
+               meter.mark(n);
+       }
+
+       @Override
+       public double getRate() {
+               return meter.getOneMinuteRate();
+       }
+
+       @Override
+       public long getCount() {
+               return meter.getCount();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8195001b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapper.java
 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapper.java
new file mode 100644
index 0000000..d0b8483
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapper.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dropwizard.metrics;
+
+import com.codahale.metrics.Clock;
+import org.apache.flink.metrics.Meter;
+
+/**
+ * Wrapper to use a Flink {@link Meter} as a Dropwizard {@link 
com.codahale.metrics.Meter}.
+ * This is necessary to report Flink's meters via the Dropwizard
+ * {@link com.codahale.metrics.Reporter}.
+ */
+public class FlinkMeterWrapper extends com.codahale.metrics.Meter {
+
+       private final Meter meter;
+
+       public FlinkMeterWrapper(Meter meter) {
+               super();
+               this.meter = meter;
+       }
+
+       public FlinkMeterWrapper(Meter meter, Clock clock) {
+               super(clock);
+               this.meter = meter;
+       }
+
+       @Override
+       public void mark() {
+               meter.markEvent();
+       }
+
+       @Override
+       public void mark(long n) {
+               meter.markEvent(n);
+       }
+
+       @Override
+       public long getCount() {
+               return meter.getCount();
+       }
+
+       @Override
+       public double getOneMinuteRate() {
+               return meter.getRate();
+       }
+
+       @Override
+       public double getFiveMinuteRate() {
+               return 0;
+       }
+
+       @Override
+       public double getFifteenMinuteRate() {
+               return 0;
+       }
+
+       @Override
+       public double getMeanRate() {
+               return 0;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8195001b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
index 5979c43..1440028 100644
--- 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
@@ -22,7 +22,9 @@ import com.codahale.metrics.ScheduledReporter;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.reporter.MetricReporter;
@@ -85,8 +87,11 @@ public class ScheduledDropwizardReporterTest {
                TaskMetricGroup taskMetricGroup = new 
TaskMetricGroup(metricRegistry, tmJobMetricGroup, new AbstractID(), new 
AbstractID(), taskName, 0, 0);
 
                SimpleCounter myCounter = new SimpleCounter();
+               com.codahale.metrics.Meter dropwizardMeter = new 
com.codahale.metrics.Meter();
+               DropwizardMeterWrapper meterWrapper = new 
DropwizardMeterWrapper(dropwizardMeter);
 
                taskMetricGroup.counter(counterName, myCounter);
+               taskMetricGroup.meter("meter", meterWrapper);
 
                List<MetricReporter> reporters = metricRegistry.getReporters();
 
@@ -98,9 +103,11 @@ public class ScheduledDropwizardReporterTest {
                TestingScheduledDropwizardReporter reporter = 
(TestingScheduledDropwizardReporter) metricReporter;
 
                Map<Counter, String> counters = reporter.getCounters();
-
                assertTrue(counters.containsKey(myCounter));
 
+               Map<Meter, String> meters = reporter.getMeters();
+               assertTrue(meters.containsKey(meterWrapper));
+
                String expectedCounterName = reporter.filterCharacters(hostname)
                        + delimiter
                        + reporter.filterCharacters(taskManagerId)

http://git-wip-us.apache.org/repos/asf/flink/blob/8195001b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapperTest.java
 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapperTest.java
new file mode 100644
index 0000000..0b8fa52
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardMeterWrapperTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dropwizard.metrics;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class DropwizardMeterWrapperTest {
+
+       @Test
+       public void testWrapper() {
+               com.codahale.metrics.Meter dropwizardMeter = 
mock(com.codahale.metrics.Meter.class);
+               when(dropwizardMeter.getOneMinuteRate()).thenReturn(1.0);
+               when(dropwizardMeter.getCount()).thenReturn(100L);
+
+               DropwizardMeterWrapper wrapper = new 
DropwizardMeterWrapper(dropwizardMeter);
+
+               assertEquals(1.0, wrapper.getRate(), 0.00001);
+               assertEquals(100L, wrapper.getCount());
+       }
+
+       @Test
+       public void testMarkEvent() {
+               com.codahale.metrics.Meter dropwizardMeter = 
mock(com.codahale.metrics.Meter.class);
+               DropwizardMeterWrapper wrapper = new 
DropwizardMeterWrapper(dropwizardMeter);
+               wrapper.markEvent();
+
+               verify(dropwizardMeter).mark();
+       }
+
+       @Test
+       public void testMarkEventN() {
+               com.codahale.metrics.Meter dropwizardMeter = 
mock(com.codahale.metrics.Meter.class);
+               DropwizardMeterWrapper wrapper = new 
DropwizardMeterWrapper(dropwizardMeter);
+               wrapper.markEvent(10L);
+
+               verify(dropwizardMeter).mark(10L);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8195001b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapperTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapperTest.java
 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapperTest.java
new file mode 100644
index 0000000..b6389c5
--- /dev/null
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/FlinkMeterWrapperTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.dropwizard.metrics;
+
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.util.TestMeter;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class FlinkMeterWrapperTest {
+
+       private static final double DELTA = 0.0001;
+
+       @Test
+       public void testWrapper() {
+               Meter meter = new TestMeter();
+
+               FlinkMeterWrapper wrapper = new FlinkMeterWrapper(meter);
+               assertEquals(0, wrapper.getMeanRate(), DELTA);
+               assertEquals(5, wrapper.getOneMinuteRate(), DELTA);
+               assertEquals(0, wrapper.getFiveMinuteRate(), DELTA);
+               assertEquals(0, wrapper.getFifteenMinuteRate(), DELTA);
+               assertEquals(100L, wrapper.getCount());
+       }
+
+       @Test
+       public void testMarkOneEvent() {
+               Meter meter = mock(Meter.class);
+
+               FlinkMeterWrapper wrapper = new FlinkMeterWrapper(meter);
+               wrapper.mark();
+
+               verify(meter).markEvent();
+       }
+
+       @Test
+       public void testMarkSeveralEvents() {
+               Meter meter = mock(Meter.class);
+
+               FlinkMeterWrapper wrapper = new FlinkMeterWrapper(meter);
+               wrapper.mark(5);
+
+               verify(meter).markEvent(5);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8195001b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
 
b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
index 1a283d9..39a5aa2 100644
--- 
a/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
+++ 
b/flink-metrics/flink-metrics-jmx/src/main/java/org/apache/flink/metrics/jmx/JMXReporter.java
@@ -21,6 +21,7 @@ package org.apache.flink.metrics.jmx;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.MetricGroup;
@@ -160,6 +161,8 @@ public class JMXReporter implements MetricReporter {
                        jmxMetric = new JmxCounter((Counter) metric);
                } else if (metric instanceof Histogram) {
                        jmxMetric = new JmxHistogram((Histogram) metric);
+               } else if (metric instanceof Meter) {
+                       jmxMetric = new JmxMeter((Meter) metric);
                } else {
                        LOG.error("Cannot add unknown metric type: {}. This 
indicates that the metric type " +
                                "is not supported by this reporter.", 
metric.getClass().getName());
@@ -417,6 +420,31 @@ public class JMXReporter implements MetricReporter {
                }
        }
 
+       public interface JmxMeterMBean extends MetricMBean {
+               double getRate();
+
+               long getCount();
+       }
+
+       private class JmxMeter extends AbstractBean implements JmxMeterMBean {
+
+               private final Meter meter;
+
+               public JmxMeter(Meter meter) {
+                       this.meter = meter;
+               }
+
+               @Override
+               public double getRate() {
+                       return meter.getRate();
+               }
+
+               @Override
+               public long getCount() {
+                       return meter.getCount();
+               }
+       }
+
        /**
         * JMX Server implementation that JMX clients can connect to.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/8195001b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
index 14ba5ec..913999b 100644
--- 
a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
+++ 
b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.util.TestMeter;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
@@ -245,6 +246,47 @@ public class JMXReporterTest extends TestLogger {
                }
        }
 
+       /**
+        * Tests that meters are properly reported via the JMXReporter.
+        */
+       @Test
+       public void testMeterReporting() throws Exception {
+               MetricRegistry registry = null;
+               String meterName = "meter";
+
+               try {
+                       Configuration config = new Configuration();
+                       
config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "jmx_test");
+                       
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + 
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
+
+                       registry = new MetricRegistry(config);
+
+                       TaskManagerMetricGroup metricGroup = new 
TaskManagerMetricGroup(registry, "localhost", "tmId");
+
+                       TestMeter meter = new TestMeter();
+
+                       metricGroup.meter(meterName, meter);
+
+                       MBeanServer mBeanServer = 
ManagementFactory.getPlatformMBeanServer();
+
+                       ObjectName objectName = new 
ObjectName(JMXReporter.generateJmxName(meterName, 
metricGroup.getScopeComponents()));
+
+                       MBeanInfo info = mBeanServer.getMBeanInfo(objectName);
+
+                       MBeanAttributeInfo[] attributeInfos = 
info.getAttributes();
+
+                       assertEquals(2, attributeInfos.length);
+
+                       assertEquals(meter.getRate(),  
mBeanServer.getAttribute(objectName, "Rate"));
+                       assertEquals(meter.getCount(), 
mBeanServer.getAttribute(objectName, "Count"));
+
+               } finally {
+                       if (registry != null) {
+                               registry.shutdown();
+                       }
+               }
+       }
+
        static class TestingHistogram implements Histogram {
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8195001b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
 
b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
index 354eff1..977d1b4 100644
--- 
a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
+++ 
b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
@@ -23,6 +23,7 @@ import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.reporter.AbstractReporter;
 import org.apache.flink.metrics.reporter.Scheduled;
@@ -118,6 +119,10 @@ public class StatsDReporter extends AbstractReporter 
implements Scheduled {
                        for (Map.Entry<Histogram, String> entry : 
histograms.entrySet()) {
                                reportHistogram(entry.getValue(), 
entry.getKey());
                        }
+
+                       for (Map.Entry<Meter, String> entry : 
meters.entrySet()) {
+                               reportMeter(entry.getValue(), entry.getKey());
+                       }
                }
                catch (ConcurrentModificationException | NoSuchElementException 
e) {
                        // ignore - may happen when metrics are concurrently 
added or removed
@@ -159,6 +164,13 @@ public class StatsDReporter extends AbstractReporter 
implements Scheduled {
                }
        }
 
+       private void reportMeter(final String name, final Meter meter) {
+               if (meter != null) {
+                       send(prefix(name, "rate"), 
String.valueOf(meter.getRate()));
+                       send(prefix(name, "count"), 
String.valueOf(meter.getCount()));
+               }
+       }
+
        private String prefix(String ... names) {
                if (names.length > 0) {
                        StringBuilder stringBuilder = new 
StringBuilder(names[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/8195001b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
 
b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index 8c1af0e..ad982f1 100644
--- 
a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ 
b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.util.TestMeter;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.reporter.MetricReporter;
@@ -195,6 +196,68 @@ public class StatsDReporterTest extends TestLogger {
        }
 
        /**
+        * Tests that meters are properly reported via the StatsD reporter
+        */
+       @Test
+       public void testStatsDMetersReporting() throws Exception {
+               MetricRegistry registry = null;
+               DatagramSocketReceiver receiver = null;
+               Thread receiverThread = null;
+               long timeout = 5000;
+               long joinTimeout = 30000;
+
+               String meterName = "meter";
+
+               try {
+                       receiver = new DatagramSocketReceiver();
+
+                       receiverThread = new Thread(receiver);
+
+                       receiverThread.start();
+
+                       int port = receiver.getPort();
+
+                       Configuration config = new Configuration();
+                       
config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test");
+                       
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + 
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, StatsDReporter.class.getName());
+                       
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + 
ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS");
+                       
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.host", 
"localhost");
+                       
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.port", "" + 
port);
+
+                       registry = new MetricRegistry(config);
+                       TaskManagerMetricGroup metricGroup = new 
TaskManagerMetricGroup(registry, "localhost", "tmId");
+                       TestMeter meter = new TestMeter();
+                       metricGroup.meter(meterName, meter);
+                       String prefix = 
metricGroup.getMetricIdentifier(meterName);
+
+                       Set<String> expectedLines = new HashSet<>();
+
+                       expectedLines.add(prefix + ".rate:5.0|g");
+                       expectedLines.add(prefix + ".count:100|g");
+
+                       receiver.waitUntilNumLines(expectedLines.size(), 
timeout);
+
+                       Set<String> lines = receiver.getLines();
+
+
+                       assertEquals(expectedLines, lines);
+
+               } finally {
+                       if (registry != null) {
+                               registry.shutdown();
+                       }
+
+                       if (receiver != null) {
+                               receiver.stop();
+                       }
+
+                       if (receiverThread != null) {
+                               receiverThread.join(joinTimeout);
+                       }
+               }
+       }
+
+       /**
         * Testing StatsDReporter which disables the socket creation
         */
        public static class TestingStatsDReporter extends StatsDReporter {

http://git-wip-us.apache.org/repos/asf/flink/blob/8195001b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index ef2bc0c..89fe3cd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -22,6 +22,7 @@ import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
@@ -231,6 +232,17 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
                return histogram;
        }
 
+       @Override
+       public <M extends Meter> M meter(int name, M meter) {
+               return meter(String.valueOf(name), meter);
+       }
+
+       @Override
+       public <M extends Meter> M meter(String name, M meter) {
+               addMetric(name, meter);
+               return meter;
+       }
+
        /**
         * Adds the given metric to the group and registers it at the registry, 
if the group
         * is not yet closed, and if no metric with the same name has been 
registered before.

http://git-wip-us.apache.org/repos/asf/flink/blob/8195001b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java
index 633c4af..0ebf749 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java
@@ -22,6 +22,7 @@ import org.apache.flink.metrics.CharacterFilter;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.MetricGroup;
 
 import java.util.Map;
@@ -82,6 +83,16 @@ public class ProxyMetricGroup<P extends MetricGroup> 
implements MetricGroup {
        }
 
        @Override
+       public <M extends Meter> M meter(String name, M meter) {
+               return parentMetricGroup.meter(name, meter);
+       }
+
+       @Override
+       public <M extends Meter> M meter(int name, M meter) {
+               return parentMetricGroup.meter(name, meter);
+       }
+
+       @Override
        public final MetricGroup addGroup(int name) {
                return parentMetricGroup.addGroup(name);
        }

Reply via email to