This is an automated email from the ASF dual-hosted git repository. robertwb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new c9ffd63 Metrics aggregation for multiple bundle runs. new 2acc044 Merge pull request #7841 from robertwb/merge-metrics c9ffd63 is described below commit c9ffd630566ab2ba05ddcd6cc03d44081e246acd Author: Robert Bradshaw <rober...@google.com> AuthorDate: Thu Feb 14 10:18:51 2019 +0100 Metrics aggregation for multiple bundle runs. --- .../python/apache_beam/metrics/monitoring_infos.py | 45 +++++++++++++++++++++- .../runners/portability/fn_api_runner.py | 11 +++++- .../runners/portability/fn_api_runner_test.py | 9 +++++ 3 files changed, 63 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py b/sdks/python/apache_beam/metrics/monitoring_infos.py index bf1fd8b..94106c6 100644 --- a/sdks/python/apache_beam/metrics/monitoring_infos.py +++ b/sdks/python/apache_beam/metrics/monitoring_infos.py @@ -20,7 +20,9 @@ from __future__ import absolute_import +import collections import time +from functools import reduce from google.protobuf import timestamp_pb2 @@ -241,6 +243,47 @@ def to_key(monitoring_info_proto): This is useful in maps to prevent reporting the same MonitoringInfo twice. """ - key_items = [i for i in monitoring_info_proto.labels.items()] + key_items = list(monitoring_info_proto.labels.items()) key_items.append(monitoring_info_proto.urn) return frozenset(key_items) + + +_KNOWN_COMBINERS = { + SUM_INT64_TYPE: lambda a, b: Metric( + counter_data=CounterData( + int64_value= + a.counter_data.int64_value + b.counter_data.int64_value)), + # TODO: Distributions, etc. +} + + +def max_timestamp(a, b): + if a.ToNanoseconds() > b.ToNanoseconds(): + return a + else: + return b + + +def consolidate(metrics, key=to_key): + grouped = collections.defaultdict(list) + for metric in metrics: + grouped[key(metric)].append(metric) + for values in grouped.values(): + if len(values) == 1: + yield values[0] + else: + combiner = _KNOWN_COMBINERS.get(values[0].type) + if combiner: + def merge(a, b): + # pylint: disable=cell-var-from-loop + return MonitoringInfo( + urn=a.urn, + type=a.type, + labels=dict((label, value) for label, value in a.labels.items() + if b.labels.get(label) == value), + metric=combiner(a.metric, b.metric), + timestamp=max_timestamp(a.timestamp, b.timestamp)) + yield reduce(merge, values) + else: + for value in values: + yield value diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index 6890af6..e4cd6a4 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -23,6 +23,7 @@ from __future__ import print_function import collections import contextlib import copy +import itertools import logging import os import queue @@ -271,8 +272,9 @@ class FnApiRunner(runner.PipelineRunner): pipeline_options.DirectOptions).direct_runner_bundle_repeat self._profiler_factory = profiler.Profile.factory_from_options( options.view_as(pipeline_options.ProfilingOptions)) - return self.run_via_runner_api(pipeline.to_runner_api( + self._latest_run_result = self.run_via_runner_api(pipeline.to_runner_api( default_environment=self._default_environment)) + return self._latest_run_result def run_via_runner_api(self, pipeline_proto): return self.run_stages(*self.create_stages(pipeline_proto)) @@ -608,6 +610,13 @@ class FnApiRunner(runner.PipelineRunner): self._progress_frequency, True).process_bundle(deferred_inputs, data_output) last_sent = deferred_inputs + result = beam_fn_api_pb2.InstructionResponse( + process_bundle=beam_fn_api_pb2.ProcessBundleResponse( + monitoring_infos=monitoring_infos.consolidate( + itertools.chain( + result.process_bundle.monitoring_infos, + last_result.process_bundle.monitoring_infos))), + error=result.error or last_result.error) else: break diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py index aadf4a8..0de632a 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py @@ -358,6 +358,8 @@ class FnApiRunnerTest(unittest.TestCase): def test_sdf(self): + counter = beam.metrics.Metrics.counter('ns', 'my_counter') + class ExpandStringsProvider(beam.transforms.core.RestrictionProvider): def initial_restriction(self, element): return (0, len(element)) @@ -379,6 +381,7 @@ class FnApiRunnerTest(unittest.TestCase): for k in range(*restriction_tracker.current_restriction()): if not restriction_tracker.try_claim(k): return + counter.inc() yield element[k] if k % 2 == 1: restriction_tracker.defer_remainder() @@ -393,6 +396,12 @@ class FnApiRunnerTest(unittest.TestCase): assert_that(actual, equal_to(list(''.join(data)))) + if isinstance(p.runner, fn_api_runner.FnApiRunner): + res = p.runner._latest_run_result + counters = res.metrics().query(beam.metrics.MetricsFilter())['counters'] + self.assertEqual(1, len(counters)) + self.assertEqual(counters[0].committed, len(''.join(data))) + def test_group_by_key(self): with self.create_pipeline() as p: res = (p