This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new e626750 [BEAM-6165] FlinkMetricContainer cleanup (#7637) e626750 is described below commit e6267500c5ad0682f558e36ecba45545499e336f Author: Ryan Williams <ryan.blake.willi...@gmail.com> AuthorDate: Sun Jan 27 22:26:28 2019 -0500 [BEAM-6165] FlinkMetricContainer cleanup (#7637) --- .../beam/runners/core/metrics/MetricUrns.java | 43 +++++++++++++++++++ .../flink/metrics/FlinkMetricContainer.java | 48 +++++++++------------- 2 files changed, 63 insertions(+), 28 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUrns.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUrns.java new file mode 100644 index 0000000..6b40603 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricUrns.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.metrics; + +import static org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder.USER_COUNTER_URN_PREFIX; + +import org.apache.beam.sdk.metrics.MetricName; + +/** Utility for parsing a URN to a {@link MetricName}. */ +public class MetricUrns { + /** + * Parse a {@link MetricName} from a {@link + * org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoUrns.Enum}. + * + * <p>Should be consistent with {@code parse_namespace_and_name} in monitoring_infos.py. + */ + public static MetricName parseUrn(String urn) { + if (urn.startsWith(USER_COUNTER_URN_PREFIX)) { + urn = urn.substring(USER_COUNTER_URN_PREFIX.length()); + } + // If it is not a user counter, just use the first part of the URN, i.e. 'beam' + String[] pieces = urn.split(":", 2); + if (pieces.length != 2) { + throw new IllegalArgumentException("Invalid metric URN: " + urn); + } + return MetricName.named(pieces[0], pieces[1]); + } +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java index c235a51..23d1f83 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java @@ -17,13 +17,18 @@ */ package org.apache.beam.runners.flink.metrics; +import static org.apache.beam.runners.core.metrics.MetricUrns.parseUrn; import static org.apache.beam.runners.core.metrics.MetricsContainerStepMap.asAttemptedOnlyMetricResults; -import static org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder.USER_COUNTER_URN_PREFIX; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.beam.model.fnexecution.v1.BeamFnApi; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.CounterData; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.DistributionData; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.ExtremaData; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.IntDistributionData; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.Metric; +import org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfo; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; import org.apache.beam.sdk.metrics.Distribution; @@ -90,36 +95,20 @@ public class FlinkMetricContainer { } /** - * Parse a {@link MetricName} from a {@link - * org.apache.beam.model.fnexecution.v1.BeamFnApi.MonitoringInfoUrns.Enum} - * - * <p>Should be consistent with {@code parse_namespace_and_name} in monitoring_infos.py - * - * <p>TODO: not flink-specific; where should it live? + * Update this container with metrics from the passed {@link MonitoringInfo}s, and send updates + * along to Flink's internal metrics framework. */ - public static MetricName parseUrn(String urn) { - if (urn.startsWith(USER_COUNTER_URN_PREFIX)) { - urn = urn.substring(USER_COUNTER_URN_PREFIX.length()); - } - // If it is not a user counter, just use the first part of the URN, i.e. 'beam' - String[] pieces = urn.split(":", 2); - if (pieces.length != 2) { - throw new IllegalArgumentException("Invalid metric URN: " + urn); - } - return MetricName.named(pieces[0], pieces[1]); - } - - public void updateMetrics(String stepName, List<BeamFnApi.MonitoringInfo> monitoringInfos) { + public void updateMetrics(String stepName, List<MonitoringInfo> monitoringInfos) { MetricsContainer metricsContainer = getMetricsContainer(stepName); monitoringInfos.forEach( monitoringInfo -> { if (monitoringInfo.hasMetric()) { String urn = monitoringInfo.getUrn(); MetricName metricName = parseUrn(urn); - BeamFnApi.Metric metric = monitoringInfo.getMetric(); + Metric metric = monitoringInfo.getMetric(); if (metric.hasCounterData()) { - BeamFnApi.CounterData counterData = metric.getCounterData(); - if (counterData.getValueCase() == BeamFnApi.CounterData.ValueCase.INT64_VALUE) { + CounterData counterData = metric.getCounterData(); + if (counterData.getValueCase() == CounterData.ValueCase.INT64_VALUE) { org.apache.beam.sdk.metrics.Counter counter = metricsContainer.getCounter(metricName); counter.inc(counterData.getInt64Value()); @@ -127,11 +116,10 @@ public class FlinkMetricContainer { LOG.warn("Unsupported CounterData type: {}", counterData); } } else if (metric.hasDistributionData()) { - BeamFnApi.DistributionData distributionData = metric.getDistributionData(); + DistributionData distributionData = metric.getDistributionData(); if (distributionData.hasIntDistributionData()) { Distribution distribution = metricsContainer.getDistribution(metricName); - BeamFnApi.IntDistributionData intDistributionData = - distributionData.getIntDistributionData(); + IntDistributionData intDistributionData = distributionData.getIntDistributionData(); distribution.update( intDistributionData.getSum(), intDistributionData.getCount(), @@ -141,7 +129,7 @@ public class FlinkMetricContainer { LOG.warn("Unsupported DistributionData type: {}", distributionData); } } else if (metric.hasExtremaData()) { - BeamFnApi.ExtremaData extremaData = metric.getExtremaData(); + ExtremaData extremaData = metric.getExtremaData(); LOG.warn("Extrema metric unsupported: {}", extremaData); } } @@ -149,6 +137,10 @@ public class FlinkMetricContainer { updateMetrics(stepName); } + /** + * Update Flink's internal metrics ({@link this#flinkCounterCache}) with the latest metrics for a + * given step. + */ void updateMetrics(String stepName) { MetricResults metricResults = asAttemptedOnlyMetricResults(metricsAccumulator.getLocalValue()); MetricQueryResults metricQueryResults =