[ https://issues.apache.org/jira/browse/BEAM-3803?focusedWorklogId=78948&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-78948 ]
ASF GitHub Bot logged work on BEAM-3803: ---------------------------------------- Author: ASF GitHub Bot Created on: 09/Mar/18 17:23 Start Date: 09/Mar/18 17:23 Worklog Time Spent: 10m Work Description: apilloud closed pull request #4823: [BEAM-3803] [Nexmark] Handle both committed and attempted metrics URL: https://github.com/apache/beam/pull/4823 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java index 9a3971a931f..9585517e185 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.metrics; +import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; @@ -32,14 +33,13 @@ /** * Return the value of this metric across all successfully completed parts of the pipeline. - * - * <p>Not all runners will support committed metrics. If they are not supported, the runner will - * throw an {@link UnsupportedOperationException}. */ + @Nullable T committed(); /** * Return the value of this metric across all attempts of executing all parts of the pipeline. */ + @Nullable T attempted(); } diff --git a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java index d634260dc1d..f8ceda14a13 100644 --- a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java +++ b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java @@ -190,7 +190,7 @@ private int maxNumWorkers() { /** * Return the current value for a long counter, or a default value if can't be retrieved. - * Note this uses only attempted metrics because some runners don't support committed metrics. + * Note some runners don't support committed metrics and some don't support attempted metrics. */ private long getCounterMetric(PipelineResult result, String namespace, String name, long defaultValue) { @@ -199,8 +199,12 @@ private long getCounterMetric(PipelineResult result, String namespace, String na Iterable<MetricResult<Long>> counters = metrics.counters(); try { MetricResult<Long> metricResult = counters.iterator().next(); - return metricResult.attempted(); - } catch (NoSuchElementException e) { + Long value = metricResult.committed(); + if (value == null) { + value = metricResult.attempted(); + } + return value; + } catch (NoSuchElementException | NullPointerException e) { LOG.error("Failed to get metric {}, from namespace {}", name, namespace); } return defaultValue; @@ -208,7 +212,7 @@ private long getCounterMetric(PipelineResult result, String namespace, String na /** * Return the current value for a long counter, or a default value if can't be retrieved. - * Note this uses only attempted metrics because some runners don't support committed metrics. + * Note some runners don't support committed metrics and some don't support attempted metrics. */ private long getDistributionMetric(PipelineResult result, String namespace, String name, DistributionType distType, long defaultValue) { @@ -216,16 +220,20 @@ private long getDistributionMetric(PipelineResult result, String namespace, Stri MetricsFilter.builder().addNameFilter(MetricNameFilter.named(namespace, name)).build()); Iterable<MetricResult<DistributionResult>> distributions = metrics.distributions(); try { - MetricResult<DistributionResult> distributionResult = distributions.iterator().next(); + MetricResult<DistributionResult> distributionResultMetric = distributions.iterator().next(); + DistributionResult distributionResult = distributionResultMetric.committed(); + if (distributionResult == null) { + distributionResult = distributionResultMetric.attempted(); + } switch (distType) { case MIN: - return distributionResult.attempted().min(); + return distributionResult.min(); case MAX: - return distributionResult.attempted().max(); + return distributionResult.max(); default: return defaultValue; } - } catch (NoSuchElementException e) { + } catch (NoSuchElementException | NullPointerException e) { LOG.error( "Failed to get distribution metric {} for namespace {}", name, ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 78948) Time Spent: 3h (was: 2h 50m) > Dataflow runner should handle metrics per the spec > -------------------------------------------------- > > Key: BEAM-3803 > URL: https://issues.apache.org/jira/browse/BEAM-3803 > Project: Beam > Issue Type: Bug > Components: runner-dataflow > Reporter: Andrew Pilloud > Assignee: Andrew Pilloud > Priority: Major > Labels: nexmark > Time Spent: 3h > Remaining Estimate: 0h > > The dataflow runner only supports committed metrics for batch jobs and > attempted metrics for streaming jobs. Nexmark should use the best available > metric source. -- This message was sent by Atlassian JIRA (v7.6.3#76005)