This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c9ffd63  Metrics aggregation for multiple bundle runs.
     new 2acc044  Merge pull request #7841 from robertwb/merge-metrics
c9ffd63 is described below

commit c9ffd630566ab2ba05ddcd6cc03d44081e246acd
Author: Robert Bradshaw <rober...@google.com>
AuthorDate: Thu Feb 14 10:18:51 2019 +0100

    Metrics aggregation for multiple bundle runs.
---
 .../python/apache_beam/metrics/monitoring_infos.py | 45 +++++++++++++++++++++-
 .../runners/portability/fn_api_runner.py           | 11 +++++-
 .../runners/portability/fn_api_runner_test.py      |  9 +++++
 3 files changed, 63 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/metrics/monitoring_infos.py 
b/sdks/python/apache_beam/metrics/monitoring_infos.py
index bf1fd8b..94106c6 100644
--- a/sdks/python/apache_beam/metrics/monitoring_infos.py
+++ b/sdks/python/apache_beam/metrics/monitoring_infos.py
@@ -20,7 +20,9 @@
 
 from __future__ import absolute_import
 
+import collections
 import time
+from functools import reduce
 
 from google.protobuf import timestamp_pb2
 
@@ -241,6 +243,47 @@ def to_key(monitoring_info_proto):
 
   This is useful in maps to prevent reporting the same MonitoringInfo twice.
   """
-  key_items = [i for i in monitoring_info_proto.labels.items()]
+  key_items = list(monitoring_info_proto.labels.items())
   key_items.append(monitoring_info_proto.urn)
   return frozenset(key_items)
+
+
+_KNOWN_COMBINERS = {
+    SUM_INT64_TYPE: lambda a, b: Metric(
+        counter_data=CounterData(
+            int64_value=
+            a.counter_data.int64_value + b.counter_data.int64_value)),
+    # TODO: Distributions, etc.
+}
+
+
+def max_timestamp(a, b):
+  if a.ToNanoseconds() > b.ToNanoseconds():
+    return a
+  else:
+    return b
+
+
+def consolidate(metrics, key=to_key):
+  grouped = collections.defaultdict(list)
+  for metric in metrics:
+    grouped[key(metric)].append(metric)
+  for values in grouped.values():
+    if len(values) == 1:
+      yield values[0]
+    else:
+      combiner = _KNOWN_COMBINERS.get(values[0].type)
+      if combiner:
+        def merge(a, b):
+          # pylint: disable=cell-var-from-loop
+          return MonitoringInfo(
+              urn=a.urn,
+              type=a.type,
+              labels=dict((label, value) for label, value in a.labels.items()
+                          if b.labels.get(label) == value),
+              metric=combiner(a.metric, b.metric),
+              timestamp=max_timestamp(a.timestamp, b.timestamp))
+        yield reduce(merge, values)
+      else:
+        for value in values:
+          yield value
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 6890af6..e4cd6a4 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -23,6 +23,7 @@ from __future__ import print_function
 import collections
 import contextlib
 import copy
+import itertools
 import logging
 import os
 import queue
@@ -271,8 +272,9 @@ class FnApiRunner(runner.PipelineRunner):
         pipeline_options.DirectOptions).direct_runner_bundle_repeat
     self._profiler_factory = profiler.Profile.factory_from_options(
         options.view_as(pipeline_options.ProfilingOptions))
-    return self.run_via_runner_api(pipeline.to_runner_api(
+    self._latest_run_result = self.run_via_runner_api(pipeline.to_runner_api(
         default_environment=self._default_environment))
+    return self._latest_run_result
 
   def run_via_runner_api(self, pipeline_proto):
     return self.run_stages(*self.create_stages(pipeline_proto))
@@ -608,6 +610,13 @@ class FnApiRunner(runner.PipelineRunner):
             self._progress_frequency,
             True).process_bundle(deferred_inputs, data_output)
         last_sent = deferred_inputs
+        result = beam_fn_api_pb2.InstructionResponse(
+            process_bundle=beam_fn_api_pb2.ProcessBundleResponse(
+                monitoring_infos=monitoring_infos.consolidate(
+                    itertools.chain(
+                        result.process_bundle.monitoring_infos,
+                        last_result.process_bundle.monitoring_infos))),
+            error=result.error or last_result.error)
       else:
         break
 
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index aadf4a8..0de632a 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -358,6 +358,8 @@ class FnApiRunnerTest(unittest.TestCase):
 
   def test_sdf(self):
 
+    counter = beam.metrics.Metrics.counter('ns', 'my_counter')
+
     class ExpandStringsProvider(beam.transforms.core.RestrictionProvider):
       def initial_restriction(self, element):
         return (0, len(element))
@@ -379,6 +381,7 @@ class FnApiRunnerTest(unittest.TestCase):
         for k in range(*restriction_tracker.current_restriction()):
           if not restriction_tracker.try_claim(k):
             return
+          counter.inc()
           yield element[k]
           if k % 2 == 1:
             restriction_tracker.defer_remainder()
@@ -393,6 +396,12 @@ class FnApiRunnerTest(unittest.TestCase):
 
       assert_that(actual, equal_to(list(''.join(data))))
 
+    if isinstance(p.runner, fn_api_runner.FnApiRunner):
+      res = p.runner._latest_run_result
+      counters = res.metrics().query(beam.metrics.MetricsFilter())['counters']
+      self.assertEqual(1, len(counters))
+      self.assertEqual(counters[0].committed, len(''.join(data)))
+
   def test_group_by_key(self):
     with self.create_pipeline() as p:
       res = (p

Reply via email to