dianfu commented on a change in pull request #11543: [FLINK-16672][python] 
Support Counter, Gauge, Meter, Distribution metric type for Python UDF
URL: https://github.com/apache/flink/pull/11543#discussion_r399330302
 
 

 ##########
 File path: 
flink-python/src/main/java/org/apache/flink/python/metric/FlinkMetricContainer.java
 ##########
 @@ -19,17 +19,267 @@
 package org.apache.flink.python.metric;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.MetricGroup;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
+import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
+import org.apache.beam.sdk.metrics.DistributionResult;
+import org.apache.beam.sdk.metrics.GaugeResult;
+import org.apache.beam.sdk.metrics.MetricKey;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsFilter;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Helper class for forwarding Python metrics to Java accumulators and metrics.
  */
 @Internal
-public class FlinkMetricContainer {
+public final class FlinkMetricContainer {
+
+       private static final String METRIC_KEY_SEPARATOR =
+               
GlobalConfiguration.loadConfiguration().getString(MetricOptions.SCOPE_DELIMITER);
 
+       private final MetricsContainerStepMap metricsContainers;
        private final MetricGroup baseMetricGroup;
+       private final Map<String, Counter> flinkCounterCache;
+       private final Map<String, Meter> flinkMeterCache;
+       private final Map<String, FlinkDistributionGauge> 
flinkDistributionGaugeCache;
+       private final Map<String, FlinkGauge> flinkGaugeCache;
 
        public FlinkMetricContainer(MetricGroup metricGroup) {
-               this.baseMetricGroup = metricGroup;
+               this.baseMetricGroup = checkNotNull(metricGroup);
+               this.flinkCounterCache = new HashMap<>();
+               this.flinkMeterCache = new HashMap<>();
+               this.flinkDistributionGaugeCache = new HashMap<>();
+               this.flinkGaugeCache = new HashMap<>();
+               this.metricsContainers = new MetricsContainerStepMap();
+       }
+
+       private MetricsContainerImpl getMetricsContainer(String stepName) {
+               return metricsContainers.getContainer(stepName);
+       }
+
+       /**
+        * Update this container with metrics from the passed {@link 
MonitoringInfo}s, and send updates
+        * along to Flink's internal metrics framework.
+        */
+       public void updateMetrics(String stepName, List<MonitoringInfo> 
monitoringInfos) {
+               getMetricsContainer(stepName).update(monitoringInfos);
+               updateMetrics(stepName);
+       }
+
+       /**
+        * Update Flink's internal metrics ({@link this#flinkCounterCache}) 
with the latest metrics for
+        * a given step.
+        */
+       private void updateMetrics(String stepName) {
+               MetricResults metricResults = 
asAttemptedOnlyMetricResults(metricsContainers);
+               MetricQueryResults metricQueryResults =
+                       
metricResults.queryMetrics(MetricsFilter.builder().addStep(stepName).build());
+               updateCounterOrMeter(metricQueryResults.getCounters());
+               updateDistributions(metricQueryResults.getDistributions());
+               updateGauge(metricQueryResults.getGauges());
+       }
+
+       private boolean isUserMetric(MetricResult metricResult) {
+               MetricName metricName = metricResult.getKey().metricName();
+               return (metricName instanceof MonitoringInfoMetricName) &&
+                       ((MonitoringInfoMetricName) metricName).getUrn()
+                               
.contains(MonitoringInfoConstants.Urns.USER_COUNTER);
+       }
+
+       private void updateCounterOrMeter(Iterable<MetricResult<Long>> 
counters) {
+               for (MetricResult<Long> metricResult : counters) {
+                       if (!isUserMetric(metricResult)) {
+                               continue;
+                       }
+                       // get identifier
+                       String flinkMetricIdentifier = 
getFlinkMetricIdentifierString(metricResult.getKey());
+
+                       // get metric type
+                       ArrayList<String> scopeComponents = 
getNameSpaceArray(metricResult.getKey());
+                       if ((scopeComponents.size() % 2) != 0) {
+                               Meter meter;
+                               if 
(flinkMeterCache.containsKey(flinkMetricIdentifier)) {
 
 Review comment:
   Change to the following then it will only lookup the map once:
   
   counter = flinkCounterCache.get(flinkMetricIdentifier);
   if (counter == null) {
      XXX
   }

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to