This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e626750  [BEAM-6165] FlinkMetricContainer cleanup (#7637)
e626750 is described below

commit e6267500c5ad0682f558e36ecba45545499e336f
Author: Ryan Williams <ryan.blake.willi...@gmail.com>
AuthorDate: Sun Jan 27 22:26:28 2019 -0500

    [BEAM-6165] FlinkMetricContainer cleanup (#7637)
---
 .../beam/runners/core/metrics/MetricUrns.java      | 43 +++++++++++++++++++
 .../flink/metrics/FlinkMetricContainer.java        | 48 +++++++++-------------
 2 files changed, 63 insertions(+), 28 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUrns.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUrns.java
new file mode 100644
index 0000000..6b40603
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUrns.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.metrics;
+
+import static 
org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder.USER_COUNTER_URN_PREFIX;
+
+import org.apache.beam.sdk.metrics.MetricName;
+
+/** Utility for parsing a URN to a {@link MetricName}. */
+public class MetricUrns {
+  /**
+   * Parse a {@link MetricName} from a {@link
+   * org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoUrns.Enum}.
+   *
+   * <p>Should be consistent with {@code parse_namespace_and_name} in 
monitoring_infos.py.
+   */
+  public static MetricName parseUrn(String urn) {
+    if (urn.startsWith(USER_COUNTER_URN_PREFIX)) {
+      urn = urn.substring(USER_COUNTER_URN_PREFIX.length());
+    }
+    // If it is not a user counter, just use the first part of the URN, i.e. 
'beam'
+    String[] pieces = urn.split(":", 2);
+    if (pieces.length != 2) {
+      throw new IllegalArgumentException("Invalid metric URN: " + urn);
+    }
+    return MetricName.named(pieces[0], pieces[1]);
+  }
+}
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
index c235a51..23d1f83 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java
@@ -17,13 +17,18 @@
  */
 package org.apache.beam.runners.flink.metrics;
 
+import static org.apache.beam.runners.core.metrics.MetricUrns.parseUrn;
 import static 
org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults;
-import static 
org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder.USER_COUNTER_URN_PREFIX;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.CounterData;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.DistributionData;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.ExtremaData;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.IntDistributionData;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.Metric;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
 import org.apache.beam.sdk.metrics.Distribution;
@@ -90,36 +95,20 @@ public class FlinkMetricContainer {
   }
 
   /**
-   * Parse a {@link MetricName} from a {@link
-   * org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoUrns.Enum}
-   *
-   * <p>Should be consistent with {@code parse_namespace_and_name} in 
monitoring_infos.py
-   *
-   * <p>TODO: not flink-specific; where should it live?
+   * Update this container with metrics from the passed {@link 
MonitoringInfo}s, and send updates
+   * along to Flink's internal metrics framework.
    */
-  public static MetricName parseUrn(String urn) {
-    if (urn.startsWith(USER_COUNTER_URN_PREFIX)) {
-      urn = urn.substring(USER_COUNTER_URN_PREFIX.length());
-    }
-    // If it is not a user counter, just use the first part of the URN, i.e. 
'beam'
-    String[] pieces = urn.split(":", 2);
-    if (pieces.length != 2) {
-      throw new IllegalArgumentException("Invalid metric URN: " + urn);
-    }
-    return MetricName.named(pieces[0], pieces[1]);
-  }
-
-  public void updateMetrics(String stepName, List<BeamFnApi.MonitoringInfo> 
monitoringInfos) {
+  public void updateMetrics(String stepName, List<MonitoringInfo> 
monitoringInfos) {
     MetricsContainer metricsContainer = getMetricsContainer(stepName);
     monitoringInfos.forEach(
         monitoringInfo -> {
           if (monitoringInfo.hasMetric()) {
             String urn = monitoringInfo.getUrn();
             MetricName metricName = parseUrn(urn);
-            BeamFnApi.Metric metric = monitoringInfo.getMetric();
+            Metric metric = monitoringInfo.getMetric();
             if (metric.hasCounterData()) {
-              BeamFnApi.CounterData counterData = metric.getCounterData();
-              if (counterData.getValueCase() == 
BeamFnApi.CounterData.ValueCase.INT64_VALUE) {
+              CounterData counterData = metric.getCounterData();
+              if (counterData.getValueCase() == 
CounterData.ValueCase.INT64_VALUE) {
                 org.apache.beam.sdk.metrics.Counter counter =
                     metricsContainer.getCounter(metricName);
                 counter.inc(counterData.getInt64Value());
@@ -127,11 +116,10 @@ public class FlinkMetricContainer {
                 LOG.warn("Unsupported CounterData type: {}", counterData);
               }
             } else if (metric.hasDistributionData()) {
-              BeamFnApi.DistributionData distributionData = 
metric.getDistributionData();
+              DistributionData distributionData = metric.getDistributionData();
               if (distributionData.hasIntDistributionData()) {
                 Distribution distribution = 
metricsContainer.getDistribution(metricName);
-                BeamFnApi.IntDistributionData intDistributionData =
-                    distributionData.getIntDistributionData();
+                IntDistributionData intDistributionData = 
distributionData.getIntDistributionData();
                 distribution.update(
                     intDistributionData.getSum(),
                     intDistributionData.getCount(),
@@ -141,7 +129,7 @@ public class FlinkMetricContainer {
                 LOG.warn("Unsupported DistributionData type: {}", 
distributionData);
               }
             } else if (metric.hasExtremaData()) {
-              BeamFnApi.ExtremaData extremaData = metric.getExtremaData();
+              ExtremaData extremaData = metric.getExtremaData();
               LOG.warn("Extrema metric unsupported: {}", extremaData);
             }
           }
@@ -149,6 +137,10 @@ public class FlinkMetricContainer {
     updateMetrics(stepName);
   }
 
+  /**
+   * Update Flink's internal metrics ({@link this#flinkCounterCache}) with the 
latest metrics for a
+   * given step.
+   */
   void updateMetrics(String stepName) {
     MetricResults metricResults = 
asAttemptedOnlyMetricResults(metricsAccumulator.getLocalValue());
     MetricQueryResults metricQueryResults =

Reply via email to