Repository: beam Updated Branches: refs/heads/master fe441e34b -> 85b820c37
[BEAM-1792] Use MetricFiltering in Spark runner. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/241ded90 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/241ded90 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/241ded90 Branch: refs/heads/master Commit: 241ded9022a9214c1d0768b1cb3c7a740a409873 Parents: fe441e3 Author: Pablo <pabl...@google.com> Authored: Fri Mar 24 10:48:43 2017 -0700 Committer: Aviem Zur <aviem...@gmail.com> Committed: Tue Mar 28 05:51:14 2017 +0300 ---------------------------------------------------------------------- .../spark/metrics/SparkMetricResults.java | 40 +------------------- 1 file changed, 2 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/241ded90/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java index c02027a..faf4c52 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/metrics/SparkMetricResults.java @@ -19,17 +19,15 @@ package org.apache.beam.runners.spark.metrics; import com.google.common.base.Function; -import com.google.common.base.Objects; import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; -import java.util.Set; import org.apache.beam.sdk.metrics.DistributionData; import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.GaugeData; import org.apache.beam.sdk.metrics.GaugeResult; +import org.apache.beam.sdk.metrics.MetricFiltering; import org.apache.beam.sdk.metrics.MetricKey; import org.apache.beam.sdk.metrics.MetricName; -import org.apache.beam.sdk.metrics.MetricNameFilter; import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.metrics.MetricResult; import org.apache.beam.sdk.metrics.MetricResults; @@ -88,44 +86,10 @@ public class SparkMetricResults extends MetricResults { return new Predicate<MetricUpdate<?>>() { @Override public boolean apply(MetricUpdate<?> metricResult) { - return matches(filter, metricResult.getKey()); + return MetricFiltering.matches(filter, metricResult.getKey()); } }; } - - private boolean matches(MetricsFilter filter, MetricKey key) { - return matchesName(key.metricName(), filter.names()) - && matchesScope(key.stepName(), filter.steps()); - } - - private boolean matchesName(MetricName metricName, Set<MetricNameFilter> nameFilters) { - if (nameFilters.isEmpty()) { - return true; - } - - for (MetricNameFilter nameFilter : nameFilters) { - if ((nameFilter.getName() == null || nameFilter.getName().equals(metricName.name())) - && Objects.equal(metricName.namespace(), nameFilter.getNamespace())) { - return true; - } - } - - return false; - } - - private boolean matchesScope(String actualScope, Set<String> scopes) { - if (scopes.isEmpty() || scopes.contains(actualScope)) { - return true; - } - - for (String scope : scopes) { - if (actualScope.startsWith(scope)) { - return true; - } - } - - return false; - } } private static final Function<MetricUpdate<DistributionData>, MetricResult<DistributionResult>>