Repository: flink Updated Branches: refs/heads/master 5dc624bf8 -> 85b53444a
[FLINK-4564] [metrics] Configurable delimiter per reporter This closes #2517. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/85b53444 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/85b53444 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/85b53444 Branch: refs/heads/master Commit: 85b53444a852027714904a050a68489740b7f2f8 Parents: 5dc624b Author: Anton Mushin <anton_mus...@epam.com> Authored: Tue Sep 20 14:09:40 2016 +0400 Committer: zentol <ches...@apache.org> Committed: Fri Oct 14 15:49:26 2016 +0200 ---------------------------------------------------------------------- docs/monitoring/metrics.md | 1 + .../flink/configuration/ConfigConstants.java | 3 + .../flink/runtime/metrics/MetricRegistry.java | 54 +++++++++++--- .../metrics/groups/AbstractMetricGroup.java | 19 +++++ .../metrics/groups/FrontMetricGroup.java | 49 ++++++++++++ .../metrics/groups/ProxyMetricGroup.java | 2 +- .../runtime/metrics/MetricRegistryTest.java | 78 ++++++++++++++++++++ .../runtime/metrics/groups/MetricGroupTest.java | 4 +- .../metrics/groups/TaskMetricGroupTest.java | 5 +- 9 files changed, 198 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/85b53444/docs/monitoring/metrics.md ---------------------------------------------------------------------- diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index 6de5b5e..cb756c5 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -280,6 +280,7 @@ Metrics can be exposed to an external system by configuring one or several repor - `metrics.reporter.<name>.<config>`: Generic setting `<config>` for the reporter named `<name>`. - `metrics.reporter.<name>.class`: The reporter class to use for the reporter named `<name>`. - `metrics.reporter.<name>.interval`: The reporter interval to use for the reporter named `<name>`. +- `metrics.reporter.<name>.scope.delimiter`: The delimiter to use for the identifier (default value use `metrics.scope.delimiter`) for the reporter named `<name>`. All reporters must at least have the `class` property, some allow specifying a reporting `interval`. Below, we will list more settings specific to each reporter. http://git-wip-us.apache.org/repos/asf/flink/blob/85b53444/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index a828281..a64b631 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -847,6 +847,9 @@ public final class ConfigConstants { /** The interval between reports. This is used as a suffix in an actual reporter config */ public static final String METRICS_REPORTER_INTERVAL_SUFFIX = "interval"; + /** The delimiter used to assemble the metric identifier. This is used as a suffix in an actual reporter config. */ + public static final String METRICS_REPORTER_SCOPE_DELIMITER = "scope.delimiter"; + /** The delimiter used to assemble the metric identifier. */ public static final String METRICS_SCOPE_DELIMITER = "metrics.scope.delimiter"; http://git-wip-us.apache.org/repos/asf/flink/blob/85b53444/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java index ae44812..e68339b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistry.java @@ -30,6 +30,7 @@ import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.Scheduled; import org.apache.flink.runtime.metrics.dump.MetricQueryService; import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup; +import org.apache.flink.runtime.metrics.groups.FrontMetricGroup; import org.apache.flink.runtime.metrics.scope.ScopeFormats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,15 +54,15 @@ public class MetricRegistry { private ActorRef queryService; private final ScopeFormats scopeFormats; - - private final char delimiter; + private final char globalDelimiter; + private final List<Character> delimiters = new ArrayList<>(); /** * Creates a new MetricRegistry and starts the configured reporter. */ public MetricRegistry(MetricRegistryConfiguration config) { this.scopeFormats = config.getScopeFormats(); - this.delimiter = config.getDelimiter(); + this.globalDelimiter = config.getDelimiter(); // second, instantiate any custom configured reporters this.reporters = new ArrayList<>(); @@ -122,6 +123,13 @@ public class MetricRegistry { LOG.info("Reporting metrics for reporter {} of type {}.", namedReporter, className); } reporters.add(reporterInstance); + + String delimiterForReporter = reporterConfig.getString(ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, String.valueOf(globalDelimiter)); + if (delimiterForReporter.length() != 1) { + LOG.warn("Failed to parse delimiter '{}' for reporter '{}', using global delimiter '{}'.", delimiterForReporter, namedReporter, globalDelimiter); + delimiterForReporter = String.valueOf(globalDelimiter); + } + this.delimiters.add(delimiterForReporter.charAt(0)); } catch (Throwable t) { shutdownExecutor(); @@ -144,8 +152,28 @@ public class MetricRegistry { } } + /** + * Returns the global delimiter. + * + * @return global delimiter + */ public char getDelimiter() { - return this.delimiter; + return this.globalDelimiter; + } + + /** + * Returns the configured delimiter for the reporter with the given index. + * + * @param reporterIndex index of the reporter whose delimiter should be used + * @return configured reporter delimiter, or global delimiter if index is invalid + */ + public char getDelimiter(int reporterIndex) { + try { + return delimiters.get(reporterIndex); + } catch (IndexOutOfBoundsException e) { + LOG.warn("Delimiter for reporter index {} not found, returning global delimiter.", reporterIndex); + return this.globalDelimiter; + } } public List<MetricReporter> getReporters() { @@ -198,17 +226,19 @@ public class MetricRegistry { * @param metricName the name of the metric * @param group the group that contains the metric */ - public void register(Metric metric, String metricName, MetricGroup group) { + public void register(Metric metric, String metricName, AbstractMetricGroup group) { try { if (reporters != null) { - for (MetricReporter reporter : reporters) { + for (int i = 0; i < reporters.size(); i++) { + MetricReporter reporter = reporters.get(i); if (reporter != null) { - reporter.notifyOfAddedMetric(metric, metricName, group); + FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group); + reporter.notifyOfAddedMetric(metric, metricName, front); } } } if (queryService != null) { - MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, (AbstractMetricGroup) group); + MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group); } } catch (Exception e) { LOG.error("Error while registering metric.", e); @@ -222,12 +252,14 @@ public class MetricRegistry { * @param metricName the name of the metric * @param group the group that contains the metric */ - public void unregister(Metric metric, String metricName, MetricGroup group) { + public void unregister(Metric metric, String metricName, AbstractMetricGroup group) { try { if (reporters != null) { - for (MetricReporter reporter : reporters) { + for (int i = 0; i < reporters.size(); i++) { + MetricReporter reporter = reporters.get(i); if (reporter != null) { - reporter.notifyOfRemovedMetric(metric, metricName, group); + FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group); + reporter.notifyOfRemovedMetric(metric, metricName, front); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/85b53444/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java index d9338ec..907c655 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroup.java @@ -183,6 +183,25 @@ public abstract class AbstractMetricGroup<A extends AbstractMetricGroup<?>> impl return scopeString + registry.getDelimiter() + metricName; } } + + /** + * Returns the fully qualified metric name using the configured delimiter for the reporter with the given index, for example + * {@code "host-7.taskmanager-2.window_word_count.my-mapper.metricName"} + * + * @param metricName metric name + * @param filter character filter which is applied to the scope components if not null. + * @param reporterIndex index of the reporter whose delimiter should be used + * @return fully qualified metric name + */ + public String getMetricIdentifier(String metricName, CharacterFilter filter, int reporterIndex) { + if (filter != null) { + scopeString = ScopeFormat.concat(filter, registry.getDelimiter(reporterIndex), scopeComponents); + return scopeString + registry.getDelimiter(reporterIndex) + filter.filterCharacters(metricName); + } else { + scopeString = ScopeFormat.concat(registry.getDelimiter(reporterIndex), scopeComponents); + return scopeString + registry.getDelimiter(reporterIndex) + metricName; + } + } // ------------------------------------------------------------------------ // Closing http://git-wip-us.apache.org/repos/asf/flink/blob/85b53444/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java new file mode 100644 index 0000000..885e6d6 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/FrontMetricGroup.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics.groups; + +import org.apache.flink.metrics.CharacterFilter; + +/** + * Metric group which forwards all registration calls to a variable parent metric group that injects a variable reporter + * index into calls to {@link org.apache.flink.metrics.MetricGroup#getMetricIdentifier(String)} + * or {@link org.apache.flink.metrics.MetricGroup#getMetricIdentifier(String, CharacterFilter)}. + * This allows us to use reporter-specific delimiters, without requiring any action by the reporter. + * + * @param <P> parentMetricGroup to {@link AbstractMetricGroup AbstractMetricGroup} + */ +public class FrontMetricGroup<P extends AbstractMetricGroup<?>> extends ProxyMetricGroup<P> { + + protected int reporterIndex; + + public FrontMetricGroup(int reporterIndex, P reference) { + super(reference); + this.reporterIndex = reporterIndex; + } + + @Override + public String getMetricIdentifier(String metricName) { + return parentMetricGroup.getMetricIdentifier(metricName, null, this.reporterIndex); + } + + @Override + public String getMetricIdentifier(String metricName, CharacterFilter filter) { + return parentMetricGroup.getMetricIdentifier(metricName, filter, this.reporterIndex); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/85b53444/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java index 0ebf749..2d49913 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/ProxyMetricGroup.java @@ -36,7 +36,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class ProxyMetricGroup<P extends MetricGroup> implements MetricGroup { - private final P parentMetricGroup; + protected final P parentMetricGroup; public ProxyMetricGroup(P parentMetricGroup) { this.parentMetricGroup = checkNotNull(parentMetricGroup); http://git-wip-us.apache.org/repos/asf/flink/blob/85b53444/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java index 5f7defb..1157215 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java @@ -24,6 +24,7 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.Scheduled; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.scope.ScopeFormats; @@ -33,10 +34,14 @@ import org.apache.flink.util.TestLogger; import org.junit.Assert; import org.junit.Test; +import java.util.List; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; public class MetricRegistryTest extends TestLogger { + + private static final char GLOBAL_DEFAULT_DELIMITER = '.'; /** * Verifies that the reporter class argument is correctly used to instantiate and open the reporter. @@ -280,4 +285,77 @@ public class MetricRegistryTest extends TestLogger { registry.shutdown(); } + + @Test + public void testConfigurableDelimiterForReporters() { + Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2,test3"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName()); + + MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); + + assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter()); + assertEquals('_', registry.getDelimiter(0)); + assertEquals('-', registry.getDelimiter(1)); + assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(2)); + assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(3)); + assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(-1)); + + registry.shutdown(); + } + + @Test + public void testConfigurableDelimiterForReportersInGroup() { + Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_REPORTERS_LIST, "test1,test2,test3,test4"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA"); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName()); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test4." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName()); + config.setString(ConfigConstants.METRICS_SCOPE_NAMING_TM, "A.B"); + + MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); + List<MetricReporter> reporters = registry.getReporters(); + ((TestReporter8)reporters.get(0)).expectedDelimiter = '_'; //test1 reporter + ((TestReporter8)reporters.get(1)).expectedDelimiter = '-'; //test2 reporter + ((TestReporter8)reporters.get(2)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //test3 reporter, because 'AA' - not correct delimiter + ((TestReporter8)reporters.get(3)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //for test4 reporter use global delimiter + + TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "host", "id"); + group.counter("C"); + group.close(); + registry.shutdown(); + assertEquals(4, TestReporter8.numCorrectDelimitersForRegister); + assertEquals(4, TestReporter8.numCorrectDelimitersForUnregister); + } + + public static class TestReporter8 extends TestReporter { + char expectedDelimiter; + public static int numCorrectDelimitersForRegister = 0; + public static int numCorrectDelimitersForUnregister = 0; + + @Override + public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) { + String expectedMetric = "A" + expectedDelimiter + "B" + expectedDelimiter + "C"; + assertEquals(expectedMetric, group.getMetricIdentifier(metricName, this)); + assertEquals(expectedMetric, group.getMetricIdentifier(metricName)); + numCorrectDelimitersForRegister++; + } + + @Override + public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) { + String expectedMetric = "A" + expectedDelimiter + "B" + expectedDelimiter + "C"; + assertEquals(expectedMetric, group.getMetricIdentifier(metricName, this)); + assertEquals(expectedMetric, group.getMetricIdentifier(metricName)); + numCorrectDelimitersForUnregister++; + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/85b53444/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java index 6a6e7aa..8a1a006 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java @@ -153,12 +153,12 @@ public class MetricGroupTest extends TestLogger { } @Override - public void register(Metric metric, String name, MetricGroup parent) { + public void register(Metric metric, String name, AbstractMetricGroup parent) { fail("Metric should never be registered"); } @Override - public void unregister(Metric metric, String name, MetricGroup parent) { + public void unregister(Metric metric, String name, AbstractMetricGroup parent) { fail("Metric should never be un-registered"); } } http://git-wip-us.apache.org/repos/asf/flink/blob/85b53444/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java index b2ae69e..eb906d0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Metric; -import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.dump.QueryScopeInfo; @@ -156,13 +155,13 @@ public class TaskMetricGroupTest extends TestLogger { } @Override - public void register(Metric metric, String metricName, MetricGroup group) { + public void register(Metric metric, String metricName, AbstractMetricGroup group) { super.register(metric, metricName, group); counter++; } @Override - public void unregister(Metric metric, String metricName, MetricGroup group) { + public void unregister(Metric metric, String metricName, AbstractMetricGroup group) { super.unregister(metric, metricName, group); counter--; }