This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8bb629460ae45b841034be660aaace3851f141fe
Author: Qingsheng Ren <[email protected]>
AuthorDate: Tue Jun 29 11:53:50 2021 +0800

    [hotfix][testutil] Add test utilization for listening metric registration
---
 .../flink/metrics/testutils/MetricListener.java    | 130 ++++++++++++++++++++
 .../flink/metric/testutils/MetricListenerTest.java | 131 +++++++++++++++++++++
 2 files changed, 261 insertions(+)

diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/metrics/testutils/MetricListener.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/metrics/testutils/MetricListener.java
new file mode 100644
index 0000000..cbada82
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/metrics/testutils/MetricListener.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metrics.testutils;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.GenericMetricGroup;
+import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A MetricListener listens metric and group registration under the provided 
root metric group, and
+ * stores them in an internal HashMap for fetching.
+ */
+public class MetricListener {
+
+    // Constants
+    public static final String DELIMITER = ".";
+    public static final String ROOT_METRIC_GROUP_NAME = "rootMetricGroup";
+
+    // Root metric group
+    private final MetricGroup rootMetricGroup;
+
+    // Map for storing registered metrics
+    private final Map<String, Metric> metrics = new HashMap<>();
+
+    public MetricListener() {
+        TestingMetricRegistry registry =
+                TestingMetricRegistry.builder()
+                        .setDelimiter(DELIMITER.charAt(0))
+                        .setRegisterConsumer(
+                                (metric, name, group) ->
+                                        
this.metrics.put(group.getMetricIdentifier(name), metric))
+                        .build();
+
+        this.rootMetricGroup = new GenericMetricGroup(registry, null, 
ROOT_METRIC_GROUP_NAME);
+    }
+
+    /**
+     * Get the root metric group of this listener. Note that only metrics and 
groups registered
+     * under this group will be listened.
+     *
+     * @return Root metric group
+     */
+    public MetricGroup getMetricGroup() {
+        return this.rootMetricGroup;
+    }
+
+    /**
+     * Get registered {@link Metric} with identifier relative to the root 
metric group.
+     *
+     * <p>For example, identifier of metric "myMetric" registered in group 
"myGroup" under root
+     * metric group can be reached by identifier ("myGroup", "myMetric")
+     *
+     * @param identifier identifier relative to the root metric group
+     * @return Registered metric
+     */
+    public <T extends Metric> T getMetric(Class<T> metricType, String... 
identifier) {
+        String actualIdentifier =
+                ROOT_METRIC_GROUP_NAME + DELIMITER + String.join(DELIMITER, 
identifier);
+        if (!metrics.containsKey(actualIdentifier)) {
+            throw new IllegalArgumentException(
+                    String.format("Metric '%s' is not registered", 
actualIdentifier));
+        }
+        return metricType.cast(metrics.get(actualIdentifier));
+    }
+
+    /**
+     * Get registered {@link Meter} with identifier relative to the root 
metric group.
+     *
+     * @param identifier identifier relative to the root metric group
+     * @return Registered meter
+     */
+    public Meter getMeter(String... identifier) {
+        return getMetric(Meter.class, identifier);
+    }
+
+    /**
+     * Get registered {@link Counter} with identifier relative to the root 
metric group.
+     *
+     * @param identifier identifier relative to the root metric group
+     * @return Registered counter
+     */
+    public Counter getCounter(String... identifier) {
+        return getMetric(Counter.class, identifier);
+    }
+
+    /**
+     * Get registered {@link Histogram} with identifier relative to the root 
metric group.
+     *
+     * @param identifier identifier relative to the root metric group
+     * @return Registered histogram
+     */
+    public Histogram getHistogram(String... identifier) {
+        return getMetric(Histogram.class, identifier);
+    }
+
+    /**
+     * Get registered {@link Gauge} with identifier relative to the root 
metric group.
+     *
+     * @param identifier identifier relative to the root metric group
+     * @return Registered gauge
+     */
+    @SuppressWarnings("unchecked")
+    public <T> Gauge<T> getGauge(String... identifier) {
+        return (Gauge<T>) getMetric(Gauge.class, identifier);
+    }
+}
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/metric/testutils/MetricListenerTest.java
 
b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/metric/testutils/MetricListenerTest.java
new file mode 100644
index 0000000..292f522
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-test-utils/src/test/java/org/apache/flink/metric/testutils/MetricListenerTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.metric.testutils;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.testutils.MetricListener;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test for {@link MetricListener}. */
+public class MetricListenerTest {
+    public static final String COUNTER_NAME = "testCounter";
+    public static final String GAUGE_NAME = "testGauge";
+    public static final String METER_NAME = "testMeter";
+    public static final String HISTOGRAM_NAME = "testHistogram";
+
+    public static final String GROUP_A = "groupA";
+    public static final String GROUP_B = "groupB";
+    public static final String GROUP_A_1 = "groupA_1";
+    public static final String GROUP_B_1 = "groupB_1";
+    public static final String GROUP_B_2 = "groupB_2";
+
+    @Test
+    public void testRegisterMetrics() {
+        MetricListener metricListener = new MetricListener();
+        final MetricGroup metricGroup = metricListener.getMetricGroup();
+
+        // Counter
+        final Counter counter = metricGroup.counter(COUNTER_NAME);
+        counter.inc(15213);
+        final Counter registeredCounter = 
metricListener.getCounter(COUNTER_NAME);
+        assertEquals(15213L, registeredCounter.getCount());
+
+        // Gauge
+        metricGroup.gauge(GAUGE_NAME, () -> 15213);
+        final Gauge<Integer> registeredGauge = 
metricListener.getGauge(GAUGE_NAME);
+        assertEquals(Integer.valueOf(15213), registeredGauge.getValue());
+
+        // Meter
+        metricGroup.meter(
+                METER_NAME,
+                new Meter() {
+                    @Override
+                    public void markEvent() {}
+
+                    @Override
+                    public void markEvent(long n) {}
+
+                    @Override
+                    public double getRate() {
+                        return 15213.0;
+                    }
+
+                    @Override
+                    public long getCount() {
+                        return 18213L;
+                    }
+                });
+        final Meter registeredMeter = metricListener.getMeter(METER_NAME);
+        assertEquals(15213.0, registeredMeter.getRate(), 0.1);
+        assertEquals(18213L, registeredMeter.getCount());
+
+        // Histogram
+        metricGroup.histogram(
+                HISTOGRAM_NAME,
+                new Histogram() {
+                    @Override
+                    public void update(long value) {}
+
+                    @Override
+                    public long getCount() {
+                        return 15213L;
+                    }
+
+                    @Override
+                    public HistogramStatistics getStatistics() {
+                        return null;
+                    }
+                });
+        final Histogram registeredHistogram = 
metricListener.getHistogram(HISTOGRAM_NAME);
+        assertEquals(15213L, registeredHistogram.getCount());
+    }
+
+    @Test
+    public void testRegisterMetricGroup() {
+        MetricListener metricListener = new MetricListener();
+        final MetricGroup rootGroup = metricListener.getMetricGroup();
+        final MetricGroup groupA1 = 
rootGroup.addGroup(GROUP_A).addGroup(GROUP_A_1);
+        final MetricGroup groupB = rootGroup.addGroup(GROUP_B);
+        final MetricGroup groupB1 = groupB.addGroup(GROUP_B_1);
+        final MetricGroup groupB2 = groupB.addGroup(GROUP_B_2);
+
+        groupA1.counter(COUNTER_NAME).inc(18213L);
+        groupB1.gauge(GAUGE_NAME, () -> 15213L);
+        groupB2.counter(COUNTER_NAME).inc(15513L);
+
+        // groupA.groupA_1.testCounter
+        assertEquals(
+                18213L, metricListener.getCounter(GROUP_A, GROUP_A_1, 
COUNTER_NAME).getCount());
+
+        // groupB.groupB_1.testGauge
+        assertEquals(15213L, metricListener.getGauge(GROUP_B, GROUP_B_1, 
GAUGE_NAME).getValue());
+
+        // groupB.groupB_2.testCounter
+        assertEquals(
+                15513L, metricListener.getCounter(GROUP_B, GROUP_B_2, 
COUNTER_NAME).getCount());
+    }
+}

Reply via email to