This is an automated email from the ASF dual-hosted git repository. xinyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 0a0bff5a049 Support gauge metrics in portable mode (#25396) 0a0bff5a049 is described below commit 0a0bff5a049a17864e66bc34f36e40836238a653 Author: Katie Liu <kati...@linkedin.com> AuthorDate: Mon Feb 13 09:49:01 2023 -0800 Support gauge metrics in portable mode (#25396) --- .../runners/core/metrics/MetricsContainerImpl.java | 47 +++++++++++++++++++--- .../core/metrics/MonitoringInfoConstants.java | 2 + .../core/metrics/MetricsContainerImplTest.java | 35 ++++++++++++++++ 3 files changed, 78 insertions(+), 6 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java index 9c0c2a46c27..c23c2bbfa08 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java @@ -25,10 +25,12 @@ import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decod import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.decodeInt64Gauge; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Counter; import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Distribution; +import static org.apache.beam.runners.core.metrics.MonitoringInfoEncodings.encodeInt64Gauge; import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; import java.io.Serializable; import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; @@ -281,7 +283,7 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer { * @return The MonitoringInfo generated from the distribution metricUpdate. */ private @Nullable MonitoringInfo distributionUpdateToMonitoringInfo( - MetricUpdate<org.apache.beam.runners.core.metrics.DistributionData> metricUpdate) { + MetricUpdate<DistributionData> metricUpdate) { SimpleMonitoringInfoBuilder builder = distributionToMonitoringMetadata(metricUpdate.getKey()); if (builder == null) { return null; @@ -290,11 +292,33 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer { return builder.build(); } + /** @return The MonitoringInfo metadata from the gauge metric. */ + private @Nullable SimpleMonitoringInfoBuilder gaugeToMonitoringMetadata(MetricKey metricKey) { + return metricToMonitoringMetadata( + metricKey, + MonitoringInfoConstants.TypeUrns.LATEST_INT64_TYPE, + MonitoringInfoConstants.Urns.USER_LATEST_INT64); + } + + /** + * @param metricUpdate + * @return The MonitoringInfo generated from the distribution metricUpdate. + */ + private @Nullable MonitoringInfo gaugeUpdateToMonitoringInfo( + MetricUpdate<GaugeData> metricUpdate) { + SimpleMonitoringInfoBuilder builder = gaugeToMonitoringMetadata(metricUpdate.getKey()); + if (builder == null) { + return null; + } + builder.setInt64LatestValue(metricUpdate.getUpdate()); + return builder.build(); + } + /** Return the cumulative values for any metrics in this container as MonitoringInfos. */ @Override public Iterable<MonitoringInfo> getMonitoringInfos() { // Extract user metrics and store as MonitoringInfos. - ArrayList<MonitoringInfo> monitoringInfos = new ArrayList<MonitoringInfo>(); + List<MonitoringInfo> monitoringInfos = new ArrayList<>(); MetricUpdates metricUpdates = this.getUpdates(); for (MetricUpdate<Long> metricUpdate : metricUpdates.counterUpdates()) { @@ -304,13 +328,19 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer { } } - for (MetricUpdate<org.apache.beam.runners.core.metrics.DistributionData> metricUpdate : - metricUpdates.distributionUpdates()) { + for (MetricUpdate<DistributionData> metricUpdate : metricUpdates.distributionUpdates()) { MonitoringInfo mi = distributionUpdateToMonitoringInfo(metricUpdate); if (mi != null) { monitoringInfos.add(mi); } } + + for (MetricUpdate<GaugeData> metricUpdate : metricUpdates.gaugeUpdates()) { + MonitoringInfo mi = gaugeUpdateToMonitoringInfo(metricUpdate); + if (mi != null) { + monitoringInfos.add(mi); + } + } return monitoringInfos; } @@ -324,14 +354,19 @@ public class MetricsContainerImpl implements Serializable, MetricsContainer { builder.put(shortId, encodeInt64Counter(metricUpdate.getUpdate())); } } - for (MetricUpdate<org.apache.beam.runners.core.metrics.DistributionData> metricUpdate : - metricUpdates.distributionUpdates()) { + for (MetricUpdate<DistributionData> metricUpdate : metricUpdates.distributionUpdates()) { String shortId = getShortId(metricUpdate.getKey(), this::distributionToMonitoringMetadata, shortIds); if (shortId != null) { builder.put(shortId, encodeInt64Distribution(metricUpdate.getUpdate())); } } + for (MetricUpdate<GaugeData> metricUpdate : metricUpdates.gaugeUpdates()) { + String shortId = getShortId(metricUpdate.getKey(), this::gaugeToMonitoringMetadata, shortIds); + if (shortId != null) { + builder.put(shortId, encodeInt64Gauge(metricUpdate.getUpdate())); + } + } return builder.build(); } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java index 2808ae58cf1..968c58ab331 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MonitoringInfoConstants.java @@ -43,6 +43,8 @@ public final class MonitoringInfoConstants { public static final String FINISH_BUNDLE_MSECS = "beam:metric:pardo_execution_time:finish_bundle_msecs:v1"; public static final String TOTAL_MSECS = extractUrn(MonitoringInfoSpecs.Enum.TOTAL_MSECS); + public static final String USER_LATEST_INT64 = + extractUrn(MonitoringInfoSpecs.Enum.USER_LATEST_INT64); public static final String USER_SUM_INT64 = extractUrn(MonitoringInfoSpecs.Enum.USER_SUM_INT64); public static final String USER_SUM_DOUBLE = extractUrn(MonitoringInfoSpecs.Enum.USER_SUM_DOUBLE); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java index 2642a9191ff..146b7df10f0 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/metrics/MetricsContainerImplTest.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo; @@ -234,6 +235,40 @@ public class MetricsContainerImplTest { assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(), builder2.build())); } + @Test + public void testMonitoringInfosArePopulatedForUserGauges() { + MetricsContainerImpl testObject = new MetricsContainerImpl("step1"); + GaugeCell gaugeCell1 = testObject.getGauge(MetricName.named("ns", "name1")); + GaugeCell gaugeCell2 = testObject.getGauge(MetricName.named("ns", "name2")); + GaugeData gaugeData1 = GaugeData.create(3L); + GaugeData gaugeData2 = GaugeData.create(4L); + gaugeCell1.update(gaugeData1); + gaugeCell2.update(gaugeData2); + + SimpleMonitoringInfoBuilder builder1 = new SimpleMonitoringInfoBuilder(); + builder1 + .setUrn(MonitoringInfoConstants.Urns.USER_LATEST_INT64) + .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns") + .setLabel(MonitoringInfoConstants.Labels.NAME, "name1") + .setInt64LatestValue(gaugeData1) + .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1"); + + SimpleMonitoringInfoBuilder builder2 = new SimpleMonitoringInfoBuilder(); + builder2 + .setUrn(MonitoringInfoConstants.Urns.USER_LATEST_INT64) + .setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "ns") + .setLabel(MonitoringInfoConstants.Labels.NAME, "name2") + .setInt64LatestValue(gaugeData2) + .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1"); + + List<MonitoringInfo> actualMonitoringInfos = new ArrayList<>(); + for (MonitoringInfo mi : testObject.getMonitoringInfos()) { + actualMonitoringInfos.add(mi); + } + + assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(), builder2.build())); + } + @Test public void testMonitoringInfosArePopulatedForSystemDistributions() { MetricsContainerImpl testObject = new MetricsContainerImpl("step1");