Adding per-stage matching to metrics filters
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b34e50c6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b34e50c6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b34e50c6 Branch: refs/heads/master Commit: b34e50c649adc8861670c42228cb96688abf4038 Parents: afd7106 Author: Pablo <pabl...@google.com> Authored: Mon Feb 27 17:28:26 2017 -0800 Committer: bchambers <bchamb...@google.com> Committed: Wed Mar 1 14:59:22 2017 -0800 ---------------------------------------------------------------------- .../beam/runners/direct/DirectMetrics.java | 31 ++++++++- .../beam/runners/direct/DirectMetricsTest.java | 70 ++++++++++++++++++++ sdks/python/apache_beam/metrics/metric.py | 53 ++++++++++++--- sdks/python/apache_beam/metrics/metric_test.py | 43 ++++++++++++ 4 files changed, 187 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b34e50c6/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java index 145326f..fa8f9c3 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java @@ -275,13 +275,40 @@ class DirectMetrics extends MetricResults { && matchesScope(key.stepName(), filter.steps()); } - private boolean matchesScope(String actualScope, Set<String> scopes) { + /** + * {@code subPathMatches(haystack, needle)} returns true if {@code needle} + * represents a path within {@code haystack}. For example, "foo/bar" is in "a/foo/bar/b", + * but not "a/fool/bar/b" or "a/foo/bart/b". + */ + public boolean subPathMatches(String haystack, String needle) { + int location = haystack.indexOf(needle); + int end = location + needle.length(); + if (location == -1) { + return false; // needle not found + } else if (location != 0 && haystack.charAt(location - 1) != '/') { + return false; // the first entry in needle wasn't exactly matched + } else if (end != haystack.length() && haystack.charAt(end) != '/') { + return false; // the last entry in needle wasn't exactly matched + } else { + return true; + } + } + + /** + * {@code matchesScope(actualScope, scopes)} returns true if the scope of a metric is matched + * by any of the filters in {@code scopes}. A metric scope is a path of type "A/B/D". A + * path is matched by a filter if the filter is equal to the path (e.g. "A/B/D", or + * if it represents a subpath within it (e.g. "A/B" or "B/D", but not "A/D"). */ + public boolean matchesScope(String actualScope, Set<String> scopes) { if (scopes.isEmpty() || scopes.contains(actualScope)) { return true; } + // If there is no perfect match, a stage name-level match is tried. + // This is done by a substring search over the levels of the scope. + // e.g. a scope "A/B/C/D" is matched by "A/B", but not by "A/C". for (String scope : scopes) { - if (actualScope.startsWith(scope)) { + if (subPathMatches(actualScope, scope)) { return true; } } http://git-wip-us.apache.org/repos/asf/beam/blob/b34e50c6/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java index 3ad2bdc..77229bf 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectMetricsTest.java @@ -23,9 +23,13 @@ import static org.apache.beam.sdk.metrics.MetricMatchers.committedMetricsResult; import static org.apache.beam.sdk.metrics.MetricNameFilter.inNamespace; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import com.google.common.collect.ImmutableList; +import java.util.HashSet; +import java.util.Set; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.metrics.DistributionData; import org.apache.beam.sdk.metrics.DistributionResult; @@ -125,6 +129,72 @@ public class DirectMetricsTest { committedMetricsResult("ns1", "name1", "step2", 0L))); } + private boolean matchesSubPath(String actualScope, String subPath) { + return metrics.subPathMatches(actualScope, subPath); + } + + @Test + public void testMatchesSubPath() { + assertTrue("Match of the first element", + matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top1")); + assertTrue("Match of the first elements", + matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1")); + assertTrue("Match of the last elements", + matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Inner1/Bottom1")); + assertFalse("Substring match but no subpath match", + matchesSubPath("Top1/Outer1/Inner1/Bottom1", "op1/Outer1/Inner1")); + assertFalse("Substring match from start - but no subpath match", + matchesSubPath("Top1/Outer1/Inner1/Bottom1", "Top")); + } + + private boolean matchesScopeWithSingleFilter(String actualScope, String filter) { + Set<String> scopeFilter = new HashSet<String>(); + scopeFilter.add(filter); + return metrics.matchesScope(actualScope, scopeFilter); + } + + @Test + public void testMatchesScope() { + assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1")); + assertTrue(matchesScopeWithSingleFilter( + "Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inner1/Bottom1")); + assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1")); + assertTrue(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inner1")); + assertFalse(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Inner1")); + assertFalse(matchesScopeWithSingleFilter("Top1/Outer1/Inner1/Bottom1", "Top1/Outer1/Inn")); + } + + @SuppressWarnings("unchecked") + @Test + public void testPartialScopeMatchingInMetricsQuery() { + metrics.updatePhysical(bundle1, MetricUpdates.create( + ImmutableList.of( + MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner1", NAME1), 5L), + MetricUpdate.create(MetricKey.create("Top1/Outer1/Inner2", NAME1), 8L)), + ImmutableList.<MetricUpdate<DistributionData>>of())); + metrics.updatePhysical(bundle1, MetricUpdates.create( + ImmutableList.of( + MetricUpdate.create(MetricKey.create("Top2/Outer1/Inner1", NAME1), 12L), + MetricUpdate.create(MetricKey.create("Top1/Outer2/Inner2", NAME1), 18L)), + ImmutableList.<MetricUpdate<DistributionData>>of())); + + MetricQueryResults results = metrics.queryMetrics( + MetricsFilter.builder().addStep("Top1/Outer1").build()); + + assertThat(results.counters(), + containsInAnyOrder( + attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner1", 5L), + attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner2", 8L))); + + results = metrics.queryMetrics( + MetricsFilter.builder().addStep("Inner2").build()); + + assertThat(results.counters(), + containsInAnyOrder( + attemptedMetricsResult("ns1", "name1", "Top1/Outer1/Inner2", 8L), + attemptedMetricsResult("ns1", "name1", "Top1/Outer2/Inner2", 18L))); + } + @SuppressWarnings("unchecked") @Test public void testApplyAttemptedQueryCompositeScope() { http://git-wip-us.apache.org/repos/asf/beam/blob/b34e50c6/sdks/python/apache_beam/metrics/metric.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/metrics/metric.py b/sdks/python/apache_beam/metrics/metric.py index a0e3cba..f6a0923 100644 --- a/sdks/python/apache_beam/metrics/metric.py +++ b/sdks/python/apache_beam/metrics/metric.py @@ -32,8 +32,7 @@ from apache_beam.metrics.metricbase import MetricName class Metrics(object): - """Lets users create/access metric objects during pipeline execution. - """ + """Lets users create/access metric objects during pipeline execution.""" @staticmethod def get_namespace(namespace): if inspect.isclass(namespace): @@ -93,14 +92,52 @@ class Metrics(object): class MetricResults(object): + + @staticmethod + def _matches_name(filter, metric_key): + if not filter.names and not filter.namespaces: + return True + + if ((filter.namespaces and + metric_key.metric.namespace in filter.namespaces) or + (filter.names and + metric_key.metric.name in filter.names)): + return True + else: + return False + + @staticmethod + def _matches_sub_path(actual_scope, filter_scope): + start_pos = actual_scope.find(filter_scope) + end_pos = start_pos + len(filter_scope) + + if start_pos == -1: + return False # No match at all + elif start_pos != 0 and actual_scope[start_pos - 1] != '/': + return False # The first entry was not exactly matched + elif end_pos != len(actual_scope) and actual_scope[end_pos] != '/': + return False # The last entry was not exactly matched + else: + return True + + @staticmethod + def _matches_scope(filter, metric_key): + if not filter.steps: + return True + + for step in filter.steps: + if MetricResults._matches_sub_path(metric_key.step, step): + return True + + return False + @staticmethod def matches(filter, metric_key): if filter is None: return True - if (metric_key.step in filter.steps and - metric_key.metric.namespace in filter.namespaces and - metric_key.metric.name in filter.names): + if (MetricResults._matches_name(filter, metric_key) and + MetricResults._matches_scope(filter, metric_key)): return True else: return False @@ -139,9 +176,9 @@ class MetricsFilter(object): def with_names(self, names): if isinstance(names, str): - raise ValueError('Names must be an iterable, not a string') + raise ValueError('Names must be a collection, not a string') - self._steps.update(names) + self._names.update(names) return self def with_namespace(self, namespace): @@ -158,7 +195,7 @@ class MetricsFilter(object): return self.with_steps([step]) def with_steps(self, steps): - if isinstance(namespaces, str): + if isinstance(steps, str): raise ValueError('Steps must be an iterable, not a string') self._steps.update(steps) http://git-wip-us.apache.org/repos/asf/beam/blob/b34e50c6/sdks/python/apache_beam/metrics/metric_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/metrics/metric_test.py b/sdks/python/apache_beam/metrics/metric_test.py index 4860edf..56b7468 100644 --- a/sdks/python/apache_beam/metrics/metric_test.py +++ b/sdks/python/apache_beam/metrics/metric_test.py @@ -22,6 +22,8 @@ from apache_beam.metrics.execution import MetricKey from apache_beam.metrics.execution import MetricsContainer from apache_beam.metrics.execution import MetricsEnvironment from apache_beam.metrics.metric import Metrics +from apache_beam.metrics.metric import MetricsFilter +from apache_beam.metrics.metric import MetricResults from apache_beam.metrics.metricbase import MetricName @@ -39,6 +41,47 @@ class NameTest(unittest.TestCase): self.assertEqual(key, MetricKey('step1', MetricName('namespace1', 'name1'))) +class MetricResultsTest(unittest.TestCase): + + def test_metric_filter_namespace_matching(self): + filter = MetricsFilter().with_namespace('ns1') + name = MetricName('ns1', 'name1') + key = MetricKey('step1', name) + self.assertTrue(MetricResults.matches(filter, key)) + + def test_metric_filter_name_matching(self): + filter = MetricsFilter().with_name('name1').with_namespace('ns1') + name = MetricName('ns1', 'name1') + key = MetricKey('step1', name) + self.assertTrue(MetricResults.matches(filter, key)) + + filter = MetricsFilter().with_name('name1') + name = MetricName('ns1', 'name1') + key = MetricKey('step1', name) + self.assertTrue(MetricResults.matches(filter, key)) + + def test_metric_filter_step_matching(self): + filter = MetricsFilter().with_step('Top1/Outer1/Inner1') + name = MetricName('ns1', 'name1') + key = MetricKey('Top1/Outer1/Inner1', name) + self.assertTrue(MetricResults.matches(filter, key)) + + filter = MetricsFilter().with_step('step1') + name = MetricName('ns1', 'name1') + key = MetricKey('step1', name) + self.assertTrue(MetricResults.matches(filter, key)) + + filter = MetricsFilter().with_step('Top1/Outer1') + name = MetricName('ns1', 'name1') + key = MetricKey('Top1/Outer1/Inner1', name) + self.assertTrue(MetricResults.matches(filter, key)) + + filter = MetricsFilter().with_step('Top1/Inner1') + name = MetricName('ns1', 'name1') + key = MetricKey('Top1/Outer1/Inner1', name) + self.assertFalse(MetricResults.matches(filter, key)) + + class MetricsTest(unittest.TestCase): def test_get_namespace_class(self): class MyClass(object):