This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8bb629460ae45b841034be660aaace3851f141fe Author: Qingsheng Ren <[email protected]> AuthorDate: Tue Jun 29 11:53:50 2021 +0800 [hotfix][testutil] Add test utilization for listening metric registration --- .../flink/metrics/testutils/MetricListener.java | 130 ++++++++++++++++++++ .../flink/metric/testutils/MetricListenerTest.java | 131 +++++++++++++++++++++ 2 files changed, 261 insertions(+) diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/metrics/testutils/MetricListener.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/metrics/testutils/MetricListener.java new file mode 100644 index 0000000..cbada82 --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/metrics/testutils/MetricListener.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.testutils; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.Metric; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.metrics.groups.GenericMetricGroup; +import org.apache.flink.runtime.metrics.util.TestingMetricRegistry; + +import java.util.HashMap; +import java.util.Map; + +/** + * A MetricListener listens metric and group registration under the provided root metric group, and + * stores them in an internal HashMap for fetching. + */ +public class MetricListener { + + // Constants + public static final String DELIMITER = "."; + public static final String ROOT_METRIC_GROUP_NAME = "rootMetricGroup"; + + // Root metric group + private final MetricGroup rootMetricGroup; + + // Map for storing registered metrics + private final Map<String, Metric> metrics = new HashMap<>(); + + public MetricListener() { + TestingMetricRegistry registry = + TestingMetricRegistry.builder() + .setDelimiter(DELIMITER.charAt(0)) + .setRegisterConsumer( + (metric, name, group) -> + this.metrics.put(group.getMetricIdentifier(name), metric)) + .build(); + + this.rootMetricGroup = new GenericMetricGroup(registry, null, ROOT_METRIC_GROUP_NAME); + } + + /** + * Get the root metric group of this listener. Note that only metrics and groups registered + * under this group will be listened. + * + * @return Root metric group + */ + public MetricGroup getMetricGroup() { + return this.rootMetricGroup; + } + + /** + * Get registered {@link Metric} with identifier relative to the root metric group. + * + * <p>For example, identifier of metric "myMetric" registered in group "myGroup" under root + * metric group can be reached by identifier ("myGroup", "myMetric") + * + * @param identifier identifier relative to the root metric group + * @return Registered metric + */ + public <T extends Metric> T getMetric(Class<T> metricType, String... identifier) { + String actualIdentifier = + ROOT_METRIC_GROUP_NAME + DELIMITER + String.join(DELIMITER, identifier); + if (!metrics.containsKey(actualIdentifier)) { + throw new IllegalArgumentException( + String.format("Metric '%s' is not registered", actualIdentifier)); + } + return metricType.cast(metrics.get(actualIdentifier)); + } + + /** + * Get registered {@link Meter} with identifier relative to the root metric group. + * + * @param identifier identifier relative to the root metric group + * @return Registered meter + */ + public Meter getMeter(String... identifier) { + return getMetric(Meter.class, identifier); + } + + /** + * Get registered {@link Counter} with identifier relative to the root metric group. + * + * @param identifier identifier relative to the root metric group + * @return Registered counter + */ + public Counter getCounter(String... identifier) { + return getMetric(Counter.class, identifier); + } + + /** + * Get registered {@link Histogram} with identifier relative to the root metric group. + * + * @param identifier identifier relative to the root metric group + * @return Registered histogram + */ + public Histogram getHistogram(String... identifier) { + return getMetric(Histogram.class, identifier); + } + + /** + * Get registered {@link Gauge} with identifier relative to the root metric group. + * + * @param identifier identifier relative to the root metric group + * @return Registered gauge + */ + @SuppressWarnings("unchecked") + public <T> Gauge<T> getGauge(String... identifier) { + return (Gauge<T>) getMetric(Gauge.class, identifier); + } +} diff --git a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/metric/testutils/MetricListenerTest.java b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/metric/testutils/MetricListenerTest.java new file mode 100644 index 0000000..292f522 --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/metric/testutils/MetricListenerTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metric.testutils; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.HistogramStatistics; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.testutils.MetricListener; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** Test for {@link MetricListener}. */ +public class MetricListenerTest { + public static final String COUNTER_NAME = "testCounter"; + public static final String GAUGE_NAME = "testGauge"; + public static final String METER_NAME = "testMeter"; + public static final String HISTOGRAM_NAME = "testHistogram"; + + public static final String GROUP_A = "groupA"; + public static final String GROUP_B = "groupB"; + public static final String GROUP_A_1 = "groupA_1"; + public static final String GROUP_B_1 = "groupB_1"; + public static final String GROUP_B_2 = "groupB_2"; + + @Test + public void testRegisterMetrics() { + MetricListener metricListener = new MetricListener(); + final MetricGroup metricGroup = metricListener.getMetricGroup(); + + // Counter + final Counter counter = metricGroup.counter(COUNTER_NAME); + counter.inc(15213); + final Counter registeredCounter = metricListener.getCounter(COUNTER_NAME); + assertEquals(15213L, registeredCounter.getCount()); + + // Gauge + metricGroup.gauge(GAUGE_NAME, () -> 15213); + final Gauge<Integer> registeredGauge = metricListener.getGauge(GAUGE_NAME); + assertEquals(Integer.valueOf(15213), registeredGauge.getValue()); + + // Meter + metricGroup.meter( + METER_NAME, + new Meter() { + @Override + public void markEvent() {} + + @Override + public void markEvent(long n) {} + + @Override + public double getRate() { + return 15213.0; + } + + @Override + public long getCount() { + return 18213L; + } + }); + final Meter registeredMeter = metricListener.getMeter(METER_NAME); + assertEquals(15213.0, registeredMeter.getRate(), 0.1); + assertEquals(18213L, registeredMeter.getCount()); + + // Histogram + metricGroup.histogram( + HISTOGRAM_NAME, + new Histogram() { + @Override + public void update(long value) {} + + @Override + public long getCount() { + return 15213L; + } + + @Override + public HistogramStatistics getStatistics() { + return null; + } + }); + final Histogram registeredHistogram = metricListener.getHistogram(HISTOGRAM_NAME); + assertEquals(15213L, registeredHistogram.getCount()); + } + + @Test + public void testRegisterMetricGroup() { + MetricListener metricListener = new MetricListener(); + final MetricGroup rootGroup = metricListener.getMetricGroup(); + final MetricGroup groupA1 = rootGroup.addGroup(GROUP_A).addGroup(GROUP_A_1); + final MetricGroup groupB = rootGroup.addGroup(GROUP_B); + final MetricGroup groupB1 = groupB.addGroup(GROUP_B_1); + final MetricGroup groupB2 = groupB.addGroup(GROUP_B_2); + + groupA1.counter(COUNTER_NAME).inc(18213L); + groupB1.gauge(GAUGE_NAME, () -> 15213L); + groupB2.counter(COUNTER_NAME).inc(15513L); + + // groupA.groupA_1.testCounter + assertEquals( + 18213L, metricListener.getCounter(GROUP_A, GROUP_A_1, COUNTER_NAME).getCount()); + + // groupB.groupB_1.testGauge + assertEquals(15213L, metricListener.getGauge(GROUP_B, GROUP_B_1, GAUGE_NAME).getValue()); + + // groupB.groupB_2.testCounter + assertEquals( + 15513L, metricListener.getCounter(GROUP_B, GROUP_B_2, COUNTER_NAME).getCount()); + } +}
