Repository: flink
Updated Branches:
  refs/heads/master 5dc624bf8 -> 85b53444a


[FLINK-4564] [metrics] Configurable delimiter per reporter

This closes #2517.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/85b53444
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/85b53444
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/85b53444

Branch: refs/heads/master
Commit: 85b53444a852027714904a050a68489740b7f2f8
Parents: 5dc624b
Author: Anton Mushin <anton_mus...@epam.com>
Authored: Tue Sep 20 14:09:40 2016 +0400
Committer: zentol <ches...@apache.org>
Committed: Fri Oct 14 15:49:26 2016 +0200

----------------------------------------------------------------------
 docs/monitoring/metrics.md                      |  1 +
 .../flink/configuration/ConfigConstants.java    |  3 +
 .../flink/runtime/metrics/MetricRegistry.java   | 54 +++++++++++---
 .../metrics/groups/AbstractMetricGroup.java     | 19 +++++
 .../metrics/groups/FrontMetricGroup.java        | 49 ++++++++++++
 .../metrics/groups/ProxyMetricGroup.java        |  2 +-
 .../runtime/metrics/MetricRegistryTest.java     | 78 ++++++++++++++++++++
 .../runtime/metrics/groups/MetricGroupTest.java |  4 +-
 .../metrics/groups/TaskMetricGroupTest.java     |  5 +-
 9 files changed, 198 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/85b53444/docs/monitoring/metrics.md
----------------------------------------------------------------------
diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 6de5b5e..cb756c5 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -280,6 +280,7 @@ Metrics can be exposed to an external system by configuring 
one or several repor
 - `metrics.reporter.<name>.<config>`: Generic setting `<config>` for the 
reporter named `<name>`.
 - `metrics.reporter.<name>.class`: The reporter class to use for the reporter 
named `<name>`.
 - `metrics.reporter.<name>.interval`: The reporter interval to use for the 
reporter named `<name>`.
+- `metrics.reporter.<name>.scope.delimiter`: The delimiter to use for the 
identifier (default value use `metrics.scope.delimiter`) for the reporter named 
`<name>`.
 
 All reporters must at least have the `class` property, some allow specifying a 
reporting `interval`. Below,
 we will list more settings specific to each reporter.

http://git-wip-us.apache.org/repos/asf/flink/blob/85b53444/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index a828281..a64b631 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -847,6 +847,9 @@ public final class ConfigConstants {
        /** The interval between reports. This is used as a suffix in an actual 
reporter config */
        public static final String METRICS_REPORTER_INTERVAL_SUFFIX = 
"interval";
 
+       /**     The delimiter used to assemble the metric identifier. This is 
used as a suffix in an actual reporter config. */
+       public static final String METRICS_REPORTER_SCOPE_DELIMITER = 
"scope.delimiter";
+
        /** The delimiter used to assemble the metric identifier. */
        public static final String METRICS_SCOPE_DELIMITER = 
"metrics.scope.delimiter";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/85b53444/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
index ae44812..e68339b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java
@@ -30,6 +30,7 @@ import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.Scheduled;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
 import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,15 +54,15 @@ public class MetricRegistry {
        private ActorRef queryService;
 
        private final ScopeFormats scopeFormats;
-
-       private final char delimiter;
+       private final char globalDelimiter;
+       private final List<Character> delimiters = new ArrayList<>();
 
        /**
         * Creates a new MetricRegistry and starts the configured reporter.
         */
        public MetricRegistry(MetricRegistryConfiguration config) {
                this.scopeFormats = config.getScopeFormats();
-               this.delimiter = config.getDelimiter();
+               this.globalDelimiter = config.getDelimiter();
 
                // second, instantiate any custom configured reporters
                this.reporters = new ArrayList<>();
@@ -122,6 +123,13 @@ public class MetricRegistry {
                                                LOG.info("Reporting metrics for 
reporter {} of type {}.", namedReporter, className);
                                        }
                                        reporters.add(reporterInstance);
+
+                                       String delimiterForReporter = 
reporterConfig.getString(ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, 
String.valueOf(globalDelimiter));
+                                       if (delimiterForReporter.length() != 1) 
{
+                                               LOG.warn("Failed to parse 
delimiter '{}' for reporter '{}', using global delimiter '{}'.", 
delimiterForReporter, namedReporter, globalDelimiter);
+                                               delimiterForReporter = 
String.valueOf(globalDelimiter);
+                                       }
+                                       
this.delimiters.add(delimiterForReporter.charAt(0));
                                }
                                catch (Throwable t) {
                                        shutdownExecutor();
@@ -144,8 +152,28 @@ public class MetricRegistry {
                }
        }
 
+       /**
+        * Returns the global delimiter.
+        *
+        * @return global delimiter
+        */
        public char getDelimiter() {
-               return this.delimiter;
+               return this.globalDelimiter;
+       }
+
+       /**
+        * Returns the configured delimiter for the reporter with the given 
index.
+        *
+        * @param reporterIndex index of the reporter whose delimiter should be 
used
+        * @return configured reporter delimiter, or global delimiter if index 
is invalid
+        */
+       public char getDelimiter(int reporterIndex) {
+               try {
+                       return delimiters.get(reporterIndex);
+               } catch (IndexOutOfBoundsException e) {
+                       LOG.warn("Delimiter for reporter index {} not found, 
returning global delimiter.", reporterIndex);
+                       return this.globalDelimiter;
+               }
        }
 
        public List<MetricReporter> getReporters() {
@@ -198,17 +226,19 @@ public class MetricRegistry {
         * @param metricName  the name of the metric
         * @param group       the group that contains the metric
         */
-       public void register(Metric metric, String metricName, MetricGroup 
group) {
+       public void register(Metric metric, String metricName, 
AbstractMetricGroup group) {
                try {
                        if (reporters != null) {
-                               for (MetricReporter reporter : reporters) {
+                               for (int i = 0; i < reporters.size(); i++) {
+                                       MetricReporter reporter = 
reporters.get(i);
                                        if (reporter != null) {
-                                               
reporter.notifyOfAddedMetric(metric, metricName, group);
+                                               FrontMetricGroup front = new 
FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
+                                               
reporter.notifyOfAddedMetric(metric, metricName, front);
                                        }
                                }
                        }
                        if (queryService != null) {
-                               
MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, 
(AbstractMetricGroup) group);
+                               
MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group);
                        }
                } catch (Exception e) {
                        LOG.error("Error while registering metric.", e);
@@ -222,12 +252,14 @@ public class MetricRegistry {
         * @param metricName  the name of the metric
         * @param group       the group that contains the metric
         */
-       public void unregister(Metric metric, String metricName, MetricGroup 
group) {
+       public void unregister(Metric metric, String metricName, 
AbstractMetricGroup group) {
                try {
                        if (reporters != null) {
-                               for (MetricReporter reporter : reporters) {
+                               for (int i = 0; i < reporters.size(); i++) {
+                                       MetricReporter reporter = 
reporters.get(i);
                                        if (reporter != null) {
-                                               
reporter.notifyOfRemovedMetric(metric, metricName, group);
+                                               FrontMetricGroup front = new 
FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
+                                               
reporter.notifyOfRemovedMetric(metric, metricName, front);
                                        }
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/85b53444/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
index d9338ec..907c655 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java
@@ -183,6 +183,25 @@ public abstract class AbstractMetricGroup<A extends 
AbstractMetricGroup<?>> impl
                        return scopeString + registry.getDelimiter() + 
metricName;
                }
        }
+
+       /**
+        * Returns the fully qualified metric name using the configured 
delimiter for the reporter with the given index, for example
+        * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"}
+        *
+        * @param metricName metric name
+        * @param filter character filter which is applied to the scope 
components if not null.
+        * @param reporterIndex index of the reporter whose delimiter should be 
used
+        * @return fully qualified metric name
+        */
+       public String getMetricIdentifier(String metricName, CharacterFilter 
filter, int reporterIndex) {
+               if (filter != null) {
+                       scopeString = ScopeFormat.concat(filter, 
registry.getDelimiter(reporterIndex), scopeComponents);
+                       return scopeString + 
registry.getDelimiter(reporterIndex) + filter.filterCharacters(metricName);
+               } else {
+                       scopeString = 
ScopeFormat.concat(registry.getDelimiter(reporterIndex), scopeComponents);
+                       return scopeString + 
registry.getDelimiter(reporterIndex) + metricName;
+               }
+       }
        
        // 
------------------------------------------------------------------------
        //  Closing

http://git-wip-us.apache.org/repos/asf/flink/blob/85b53444/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
new file mode 100644
index 0000000..885e6d6
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.metrics.CharacterFilter;
+
+/**
+ * Metric group which forwards all registration calls to a variable parent 
metric group that injects a variable reporter
+ * index into calls to {@link 
org.apache.flink.metrics.MetricGroup#getMetricIdentifier(String)}
+ * or {@link org.apache.flink.metrics.MetricGroup#getMetricIdentifier(String, 
CharacterFilter)}.
+ * This allows us to use reporter-specific delimiters, without requiring any 
action by the reporter.
+ *
+ * @param <P> parentMetricGroup to {@link AbstractMetricGroup 
AbstractMetricGroup}
+ */
+public class FrontMetricGroup<P extends AbstractMetricGroup<?>> extends 
ProxyMetricGroup<P> {
+
+       protected int reporterIndex;
+
+       public FrontMetricGroup(int reporterIndex, P reference) {
+               super(reference);
+               this.reporterIndex = reporterIndex;
+       }
+
+       @Override
+       public String getMetricIdentifier(String metricName) {
+               return parentMetricGroup.getMetricIdentifier(metricName, null, 
this.reporterIndex);
+       }
+
+       @Override
+       public String getMetricIdentifier(String metricName, CharacterFilter 
filter) {
+               return parentMetricGroup.getMetricIdentifier(metricName, 
filter, this.reporterIndex);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/85b53444/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java
index 0ebf749..2d49913 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java
@@ -36,7 +36,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ProxyMetricGroup<P extends MetricGroup> implements MetricGroup {
 
-       private final P parentMetricGroup;
+       protected final P parentMetricGroup;
 
        public ProxyMetricGroup(P parentMetricGroup) {
                this.parentMetricGroup = checkNotNull(parentMetricGroup);

http://git-wip-us.apache.org/repos/asf/flink/blob/85b53444/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
index 5f7defb..1157215 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.reporter.Scheduled;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.scope.ScopeFormats;
@@ -33,10 +34,14 @@ import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.List;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class MetricRegistryTest extends TestLogger {
+
+       private static final char GLOBAL_DEFAULT_DELIMITER = '.';
        
        /**
         * Verifies that the reporter class argument is correctly used to 
instantiate and open the reporter.
@@ -280,4 +285,77 @@ public class MetricRegistryTest extends TestLogger {
 
                registry.shutdown();
        }
+
+       @Test
+       public void testConfigurableDelimiterForReporters() {
+               Configuration config = new Configuration();
+               config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test1,test2,test3");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter.class.getName());
+
+               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+
+               assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter());
+               assertEquals('_', registry.getDelimiter(0));
+               assertEquals('-', registry.getDelimiter(1));
+               assertEquals(GLOBAL_DEFAULT_DELIMITER, 
registry.getDelimiter(2));
+               assertEquals(GLOBAL_DEFAULT_DELIMITER, 
registry.getDelimiter(3));
+               assertEquals(GLOBAL_DEFAULT_DELIMITER, 
registry.getDelimiter(-1));
+
+               registry.shutdown();
+       }
+
+       @Test
+       public void testConfigurableDelimiterForReportersInGroup() {
+               Configuration config = new Configuration();
+               config.setString(ConfigConstants.METRICS_REPORTERS_LIST, 
"test1,test2,test3,test4");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter8.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter8.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA");
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter8.class.getName());
+               config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + 
"test4." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, 
TestReporter8.class.getName());
+               config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, 
"A.B");
+
+               MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+               List<MetricReporter> reporters = registry.getReporters();
+               ((TestReporter8)reporters.get(0)).expectedDelimiter = '_'; 
//test1  reporter
+               ((TestReporter8)reporters.get(1)).expectedDelimiter = '-'; 
//test2 reporter
+               ((TestReporter8)reporters.get(2)).expectedDelimiter = 
GLOBAL_DEFAULT_DELIMITER; //test3 reporter, because 'AA' - not correct delimiter
+               ((TestReporter8)reporters.get(3)).expectedDelimiter = 
GLOBAL_DEFAULT_DELIMITER; //for test4 reporter use global delimiter
+
+               TaskManagerMetricGroup group = new 
TaskManagerMetricGroup(registry, "host", "id");
+               group.counter("C");
+               group.close();
+               registry.shutdown();
+               assertEquals(4, TestReporter8.numCorrectDelimitersForRegister);
+               assertEquals(4, 
TestReporter8.numCorrectDelimitersForUnregister);
+       }
+
+       public static class TestReporter8 extends TestReporter {
+               char expectedDelimiter;
+               public static int numCorrectDelimitersForRegister = 0;
+               public static int numCorrectDelimitersForUnregister = 0;
+
+               @Override
+               public void notifyOfAddedMetric(Metric metric, String 
metricName, MetricGroup group) {
+                       String expectedMetric = "A" + expectedDelimiter + "B" + 
expectedDelimiter + "C";
+                       assertEquals(expectedMetric, 
group.getMetricIdentifier(metricName, this));
+                       assertEquals(expectedMetric, 
group.getMetricIdentifier(metricName));
+                       numCorrectDelimitersForRegister++;
+               }
+
+               @Override
+               public void notifyOfRemovedMetric(Metric metric, String 
metricName, MetricGroup group) {
+                       String expectedMetric = "A" + expectedDelimiter + "B" + 
expectedDelimiter + "C";
+                       assertEquals(expectedMetric, 
group.getMetricIdentifier(metricName, this));
+                       assertEquals(expectedMetric, 
group.getMetricIdentifier(metricName));
+                       numCorrectDelimitersForUnregister++;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/85b53444/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
index 6a6e7aa..8a1a006 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
@@ -153,12 +153,12 @@ public class MetricGroupTest extends TestLogger {
                }
 
                @Override
-               public void register(Metric metric, String name, MetricGroup 
parent) {
+               public void register(Metric metric, String name, 
AbstractMetricGroup parent) {
                        fail("Metric should never be registered");
                }
 
                @Override
-               public void unregister(Metric metric, String name, MetricGroup 
parent) {
+               public void unregister(Metric metric, String name, 
AbstractMetricGroup parent) {
                        fail("Metric should never be un-registered");
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/85b53444/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index b2ae69e..eb906d0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
@@ -156,13 +155,13 @@ public class TaskMetricGroupTest extends TestLogger {
                }
 
                @Override
-               public void register(Metric metric, String metricName, 
MetricGroup group) {
+               public void register(Metric metric, String metricName, 
AbstractMetricGroup group) {
                        super.register(metric, metricName, group);
                        counter++;
                }
 
                @Override
-               public void unregister(Metric metric, String metricName, 
MetricGroup group) {
+               public void unregister(Metric metric, String metricName, 
AbstractMetricGroup group) {
                        super.unregister(metric, metricName, group);
                        counter--;
                }

Reply via email to