Repository: beam Updated Branches: refs/heads/master 0e6b3794c -> 860ac1d1f
Removing aggregators from Python SDK Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/93832aa1 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/93832aa1 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/93832aa1 Branch: refs/heads/master Commit: 93832aa16254175d094e9eabc469318316ddf858 Parents: 0e6b379 Author: Pablo <pabl...@google.com> Authored: Tue Jan 17 11:26:22 2017 -0800 Committer: Ahmet Altay <al...@google.com> Committed: Wed Feb 1 16:23:18 2017 -0800 ---------------------------------------------------------------------- .../examples/cookbook/datastore_wordcount.py | 22 ++-- .../apache_beam/examples/snippets/snippets.py | 18 +-- sdks/python/apache_beam/examples/wordcount.py | 26 ++-- .../apache_beam/examples/wordcount_debugging.py | 21 ++-- sdks/python/apache_beam/metrics/metric_test.py | 2 +- sdks/python/apache_beam/runners/common.py | 5 - sdks/python/apache_beam/transforms/__init__.py | 1 - .../python/apache_beam/transforms/aggregator.py | 120 ------------------- .../apache_beam/transforms/aggregator_test.py | 77 ------------ sdks/python/apache_beam/transforms/core.py | 12 +- 10 files changed, 43 insertions(+), 261 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/93832aa1/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py index 067cb80..4d00b74 100644 --- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py +++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py @@ -74,14 +74,14 @@ import apache_beam as beam from apache_beam.io import ReadFromText from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore +from apache_beam.metrics import Metrics from apache_beam.utils.pipeline_options import GoogleCloudOptions from apache_beam.utils.pipeline_options import PipelineOptions from apache_beam.utils.pipeline_options import SetupOptions -empty_line_aggregator = beam.Aggregator('emptyLines') -average_word_size_aggregator = beam.Aggregator('averageWordLength', - beam.combiners.MeanCombineFn(), - float) +empty_line_counter = Metrics.counter('main', 'empty_lines') +word_length_counter = Metrics.counter('main', 'word_lengths') +word_counter = Metrics.counter('main', 'total_words') class WordExtractingDoFn(beam.DoFn): @@ -91,7 +91,7 @@ class WordExtractingDoFn(beam.DoFn): """Returns an iterator over words in contents of Cloud Datastore entity. The element is a line of text. If the line is blank, note that, too. Args: - context: the call-specific context: data and aggregator. + context: the call-specific context with input data. Returns: The processed element. """ @@ -101,10 +101,11 @@ class WordExtractingDoFn(beam.DoFn): text_line = content_value.string_value if not text_line: - context.aggregate_to(empty_line_aggregator, 1) + empty_line_counter.inc() words = re.findall(r'[A-Za-z\']+', text_line) for w in words: - context.aggregate_to(average_word_size_aggregator, len(w)) + word_length_counter.inc(len(w)) + word_counter.inc() return words @@ -246,10 +247,9 @@ def run(argv=None): result = read_from_datastore(gcloud_options.project, known_args, pipeline_options) - empty_line_values = result.aggregated_values(empty_line_aggregator) - logging.info('number of empty lines: %d', sum(empty_line_values.values())) - word_length_values = result.aggregated_values(average_word_size_aggregator) - logging.info('average word lengths: %s', word_length_values.values()) + result.metrics().query() + #TODO(pabloem)(BEAM-1366) Fix these once metrics are 100% queriable. + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) http://git-wip-us.apache.org/repos/asf/beam/blob/93832aa1/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 631ab2d..9b775e2 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -32,6 +32,7 @@ string. The tags can contain only letters, digits and _. import apache_beam as beam from apache_beam.test_pipeline import TestPipeline +from apache_beam.metrics import Metrics # Quiet some pylint warnings that happen because of the somewhat special # format for the code snippets. @@ -516,13 +517,12 @@ def examples_wordcount_debugging(renames): class FilterTextFn(beam.DoFn): """A DoFn that filters for a specific key based on a regular expression.""" - # A custom aggregator can track values in your pipeline as it runs. Create - # custom aggregators matched_word and unmatched_words. - matched_words = beam.Aggregator('matched_words') - umatched_words = beam.Aggregator('umatched_words') - def __init__(self, pattern): self.pattern = pattern + # A custom metric can track values in your pipeline as it runs. Create + # custom metrics matched_word and unmatched_words. + self.matched_words = Metrics.counter(self.__class__, 'matched_words') + self.umatched_words = Metrics.counter(self.__class__, 'umatched_words') def process(self, context): word, _ = context.element @@ -532,8 +532,8 @@ def examples_wordcount_debugging(renames): # Logging UI. logging.info('Matched %s', word) - # Add 1 to the custom aggregator matched_words - context.aggregate_to(self.matched_words, 1) + # Add 1 to the custom metric counter matched_words + self.matched_words.inc() yield context.element else: # Log at the "DEBUG" level each element that is not matched. Different @@ -543,8 +543,8 @@ def examples_wordcount_debugging(renames): # Logger. This log message will not be visible in the Cloud Logger. logging.debug('Did not match %s', word) - # Add 1 to the custom aggregator umatched_words - context.aggregate_to(self.umatched_words, 1) + # Add 1 to the custom metric counter umatched_words + self.umatched_words.inc() # [END example_wordcount_debugging_logging] # [END example_wordcount_debugging_aggregators] http://git-wip-us.apache.org/repos/asf/beam/blob/93832aa1/sdks/python/apache_beam/examples/wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py index 92929af..3a120fd 100644 --- a/sdks/python/apache_beam/examples/wordcount.py +++ b/sdks/python/apache_beam/examples/wordcount.py @@ -26,36 +26,38 @@ import re import apache_beam as beam from apache_beam.io import ReadFromText from apache_beam.io import WriteToText +from apache_beam.metrics import Metrics from apache_beam.utils.pipeline_options import PipelineOptions from apache_beam.utils.pipeline_options import SetupOptions -empty_line_aggregator = beam.Aggregator('emptyLines') -average_word_size_aggregator = beam.Aggregator('averageWordLength', - beam.combiners.MeanCombineFn(), - float) - - class WordExtractingDoFn(beam.DoFn): """Parse each line of input text into words.""" + def __init__(self): + super(WordExtractingDoFn, self).__init__() + self.words_counter = Metrics.counter(self.__class__, 'words') + self.word_lengths_counter = Metrics.counter(self.__class__, 'word_lengths') + self.empty_line_counter = Metrics.counter(self.__class__, 'empty_lines') + def process(self, context): """Returns an iterator over the words of this element. The element is a line of text. If the line is blank, note that, too. Args: - context: the call-specific context: data and aggregator. + context: the call-specific context. Returns: The processed element. """ text_line = context.element.strip() if not text_line: - context.aggregate_to(empty_line_aggregator, 1) + self.empty_line_counter.inc(1) words = re.findall(r'[A-Za-z\']+', text_line) for w in words: - context.aggregate_to(average_word_size_aggregator, len(w)) + self.words_counter.inc() + self.word_lengths_counter.inc(len(w)) return words @@ -98,11 +100,7 @@ def run(argv=None): # Actually run the pipeline (all operations above are deferred). result = p.run() result.wait_until_finish() - empty_line_values = result.aggregated_values(empty_line_aggregator) - logging.info('number of empty lines: %d', sum(empty_line_values.values())) - word_length_values = result.aggregated_values(average_word_size_aggregator) - logging.info('average word lengths: %s', word_length_values.values()) - + #TODO(pabloem)(BEAM-1366) Add querying of metrics once they are queriable. if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) http://git-wip-us.apache.org/repos/asf/beam/blob/93832aa1/sdks/python/apache_beam/examples/wordcount_debugging.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py index ac13f35..ac24660 100644 --- a/sdks/python/apache_beam/examples/wordcount_debugging.py +++ b/sdks/python/apache_beam/examples/wordcount_debugging.py @@ -48,25 +48,22 @@ import re import apache_beam as beam from apache_beam.io import ReadFromText from apache_beam.io import WriteToText +from apache_beam.metrics import Metrics from apache_beam.utils.pipeline_options import PipelineOptions from apache_beam.utils.pipeline_options import SetupOptions class FilterTextFn(beam.DoFn): """A DoFn that filters for a specific key based on a regular expression.""" - - # A custom aggregator can track values in your pipeline as it runs. Those - # values will be displayed in the Dataflow Monitoring UI when this pipeline is - # run using the Dataflow service. These aggregators below track the number of - # matched and unmatched words. Learn more at - # https://cloud.google.com/dataflow/pipelines/dataflow-monitoring-intf about - # the Dataflow Monitoring UI. - matched_words = beam.Aggregator('matched_words') - umatched_words = beam.Aggregator('umatched_words') - def __init__(self, pattern): super(FilterTextFn, self).__init__() self.pattern = pattern + # A custom metric can track values in your pipeline as it runs. Those + # values will be available in the monitoring system of the runner used + # to run the pipeline. These metrics below track the number of + # matched and unmatched words. + self.matched_words = Metrics.counter(self.__class__, 'matched_words') + self.umatched_words = Metrics.counter(self.__class__, 'umatched_words') def process(self, context): word, _ = context.element @@ -75,7 +72,7 @@ class FilterTextFn(beam.DoFn): # using the Dataflow service, these log lines will appear in the Cloud # Logging UI. logging.info('Matched %s', word) - context.aggregate_to(self.matched_words, 1) + self.matched_words.inc() yield context.element else: # Log at the "DEBUG" level each element that is not matched. Different log @@ -84,7 +81,7 @@ class FilterTextFn(beam.DoFn): # Note currently only "INFO" and higher level logs are emitted to the # Cloud Logger. This log message will not be visible in the Cloud Logger. logging.debug('Did not match %s', word) - context.aggregate_to(self.umatched_words, 1) + self.umatched_words.inc() class CountWords(beam.PTransform): http://git-wip-us.apache.org/repos/asf/beam/blob/93832aa1/sdks/python/apache_beam/metrics/metric_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/metrics/metric_test.py b/sdks/python/apache_beam/metrics/metric_test.py index c478a85..4860edf 100644 --- a/sdks/python/apache_beam/metrics/metric_test.py +++ b/sdks/python/apache_beam/metrics/metric_test.py @@ -59,7 +59,7 @@ class MetricsTest(unittest.TestCase): MetricsEnvironment.set_current_container(MetricsContainer('mystep')) counter_ns = 'aCounterNamespace' distro_ns = 'aDistributionNamespace' - name = 'aName' + name = 'a_name' counter = Metrics.counter(counter_ns, name) distro = Metrics.distribution(distro_ns, name) counter.inc(10) http://git-wip-us.apache.org/repos/asf/beam/blob/93832aa1/sdks/python/apache_beam/runners/common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index cb47513..dbbd9ba 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -402,13 +402,8 @@ class DoFnContext(object): else: return self.windowed_value.windows - def aggregate_to(self, aggregator, input_value): - self.state.counter_for(aggregator).update(input_value) - # TODO(robertwb): Remove all these adapters once service is updated out. - - class _LoggingContextAdapter(LoggingContext): def __init__(self, underlying): http://git-wip-us.apache.org/repos/asf/beam/blob/93832aa1/sdks/python/apache_beam/transforms/__init__.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/__init__.py b/sdks/python/apache_beam/transforms/__init__.py index db8e193..847fb8f 100644 --- a/sdks/python/apache_beam/transforms/__init__.py +++ b/sdks/python/apache_beam/transforms/__init__.py @@ -19,7 +19,6 @@ # pylint: disable=wildcard-import from apache_beam.transforms import combiners -from apache_beam.transforms.aggregator import * from apache_beam.transforms.core import * from apache_beam.transforms.ptransform import * from apache_beam.transforms.timeutil import * http://git-wip-us.apache.org/repos/asf/beam/blob/93832aa1/sdks/python/apache_beam/transforms/aggregator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/aggregator.py b/sdks/python/apache_beam/transforms/aggregator.py deleted file mode 100644 index 05ef635..0000000 --- a/sdks/python/apache_beam/transforms/aggregator.py +++ /dev/null @@ -1,120 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Support for user-defined Aggregators. - -Aggregators allow steps in a pipeline to perform custom aggregation of -statistics about the data processed across all workers. To update an -aggregator's value, call aggregate_to() on the context passed to a DoFn. - -Example: -import apache_beam as beam - -class ExampleDoFn(beam.DoFn): - def __init__(self): - super(ExampleDoFn, self).__init__() - self.simple_counter = beam.Aggregator('example-counter') - - def process(self, context): - context.aggregate_to(self.simple_counter, 1) - ... - -These aggregators may be used by runners to collect and present statistics of -a pipeline. For example, in the Google Cloud Dataflow console, aggregators and -their values show up in the UI under "Custom counters." - -You can also query the combined value(s) of an aggregator by calling -aggregated_value() or aggregated_values() on the result object returned after -running a pipeline. -""" - -from __future__ import absolute_import - -from apache_beam.transforms import core - - -class Aggregator(object): - """A user-specified aggregator of statistics about a pipeline step. - - Args: - combine_fn: how to combine values input to the aggregation. - It must be one of these arithmetic functions: - - - Python's built-in sum, min, max, any, and all. - - beam.combiners.MeanCombineFn() - - The default is sum of 64-bit ints. - - type: describes the type that will be accepted as input - for aggregation; by default types appropriate to the combine_fn - are accepted. - - Example uses:: - - import apache_beam as beam - - class ExampleDoFn(beam.DoFn): - def __init__(self): - super(ExampleDoFn, self).__init__() - self.simple_counter = beam.Aggregator('example-counter') - self.complex_counter = beam.Aggregator('other-counter', beam.Mean(), - float) - - def process(self, context): - context.aggregate_to(self.simple_counter, 1) - context.aggregate_to(self.complex_counter, float(len(context.element)) - """ - - def __init__(self, name, combine_fn=sum, input_type=int): - combine_fn = core.CombineFn.maybe_from_callable(combine_fn).for_input_type( - input_type) - if not _is_supported_kind(combine_fn): - raise ValueError( - 'combine_fn %r (class %r) ' - 'does not map to a supported aggregation kind' - % (combine_fn, combine_fn.__class__)) - self.name = name - self.combine_fn = combine_fn - self.input_type = input_type - - def __str__(self): - return '<%s>' % self._str_internal() - - def __repr__(self): - return '<%s at %s>' % (self._str_internal(), hex(id(self))) - - def _str_internal(self): - """Internal helper function for both __str__ and __repr__.""" - def get_name(thing): - try: - return thing.__name__ - except AttributeError: - return thing.__class__.__name__ - - combine_fn_str = get_name(self.combine_fn) - input_arg = '(%s)' % get_name(self.input_type) if self.input_type else '' - if combine_fn_str == 'sum' and not input_arg: - combine_call = '' - else: - combine_call = ' %s%s' % (combine_fn_str, input_arg) - return 'Aggregator %s%s' % (self.name, combine_call) - - -def _is_supported_kind(combine_fn): - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam.internal.apiclient import metric_translations - return combine_fn.__class__ in metric_translations http://git-wip-us.apache.org/repos/asf/beam/blob/93832aa1/sdks/python/apache_beam/transforms/aggregator_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/aggregator_test.py b/sdks/python/apache_beam/transforms/aggregator_test.py deleted file mode 100644 index a2a4144..0000000 --- a/sdks/python/apache_beam/transforms/aggregator_test.py +++ /dev/null @@ -1,77 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Unit tests for Aggregator class.""" - -import unittest - -import apache_beam as beam -from apache_beam.test_pipeline import TestPipeline -from apache_beam.transforms import combiners -from apache_beam.transforms.aggregator import Aggregator - - -class AggregatorTest(unittest.TestCase): - - def test_str(self): - basic = Aggregator('a-name') - self.assertEqual('<Aggregator a-name SumInt64Fn(int)>', str(basic)) - - for_max = Aggregator('max-name', max) - self.assertEqual('<Aggregator max-name MaxInt64Fn(int)>', str(for_max)) - - for_float = Aggregator('f-name', sum, float) - self.assertEqual('<Aggregator f-name SumFloatFn(float)>', str(for_float)) - - for_mean = Aggregator('m-name', combiners.MeanCombineFn(), float) - self.assertEqual('<Aggregator m-name MeanFloatFn(float)>', str(for_mean)) - - def test_aggregation(self): - - mean = combiners.MeanCombineFn() - mean.__name__ = 'mean' - counter_types = [ - (sum, int, 6), - (min, int, 0), - (max, int, 3), - (mean, int, 1), - (sum, float, 6.0), - (min, float, 0.0), - (max, float, 3.0), - (mean, float, 1.5), - (any, int, True), - (all, float, False), - ] - aggregators = [Aggregator('%s_%s' % (f.__name__, t.__name__), f, t) - for f, t, _ in counter_types] - - class UpdateAggregators(beam.DoFn): - def process(self, context): - for a in aggregators: - context.aggregate_to(a, context.element) - - p = TestPipeline() - p | beam.Create([0, 1, 2, 3]) | beam.ParDo(UpdateAggregators()) # pylint: disable=expression-not-assigned - res = p.run() - for (_, _, expected), a in zip(counter_types, aggregators): - actual = res.aggregated_values(a).values()[0] - self.assertEqual(expected, actual) - self.assertEqual(type(expected), type(actual)) - - -if __name__ == '__main__': - unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/93832aa1/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 70a03ae..20126d3 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -74,7 +74,7 @@ class DoFnProcessContext(DoFnContext): windows: windows of the element (in process method only; always None in start_bundle and finish_bundle) state: a DoFnState object, which holds the runner's internal state - for this element. For example, aggregator state is here. + for this element. Not used by the pipeline code. """ @@ -109,16 +109,6 @@ class DoFnProcessContext(DoFnContext): self.timestamp = windowed_value.timestamp self.windows = windowed_value.windows - # TODO(sourabhbajaj): Move as we're trying to deprecate the use of context - def aggregate_to(self, aggregator, input_value): - """Provide a new input value for the aggregator. - - Args: - aggregator: the aggregator to update - input_value: the new value to input to the combine_fn of this aggregator. - """ - self.state.counter_for(aggregator).update(input_value) - class NewDoFn(WithTypeHints, HasDisplayData): """A function object used by a transform with custom processing.