Repository: beam Updated Branches: refs/heads/master 7c425b097 -> 7f50ea2e5
Rename OutputValue to TaggedOutput. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9d3aebea Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9d3aebea Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9d3aebea Branch: refs/heads/master Commit: 9d3aebeab7891bebdbf13c7567478c6b5fe9b3f4 Parents: 7c425b0 Author: Robert Bradshaw <rober...@google.com> Authored: Mon May 1 18:15:32 2017 -0500 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Mon May 1 17:45:48 2017 -0700 ---------------------------------------------------------------------- .../examples/cookbook/multiple_output_pardo.py | 6 +++--- .../apache_beam/examples/snippets/snippets_test.py | 6 +++--- sdks/python/apache_beam/pvalue.py | 6 +++--- sdks/python/apache_beam/runners/common.py | 10 +++++----- .../direct/consumer_tracking_pipeline_visitor_test.py | 2 +- sdks/python/apache_beam/transforms/core.py | 4 ++-- sdks/python/apache_beam/transforms/ptransform_test.py | 12 ++++++------ sdks/python/apache_beam/typehints/typecheck.py | 4 ++-- 8 files changed, 25 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9d3aebea/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py index b324ed1..9c82df4 100644 --- a/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py +++ b/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py @@ -97,15 +97,15 @@ class SplitLinesToWordsFn(beam.DoFn): """ # yield a count (integer) to the OUTPUT_TAG_CHARACTER_COUNT tagged # collection. - yield pvalue.OutputValue(self.OUTPUT_TAG_CHARACTER_COUNT, - len(element)) + yield pvalue.TaggedOutput( + self.OUTPUT_TAG_CHARACTER_COUNT, len(element)) words = re.findall(r'[A-Za-z\']+', element) for word in words: if len(word) <= 3: # yield word as an output to the OUTPUT_TAG_SHORT_WORDS tagged # collection. - yield pvalue.OutputValue(self.OUTPUT_TAG_SHORT_WORDS, word) + yield pvalue.TaggedOutput(self.OUTPUT_TAG_SHORT_WORDS, word) else: # yield word to add it to the main collection. yield word http://git-wip-us.apache.org/repos/asf/beam/blob/9d3aebea/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index a3cdb24..afd7918 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -186,11 +186,11 @@ class ParDoTest(unittest.TestCase): yield element else: # Emit this word's long length to the 'above_cutoff_lengths' output. - yield pvalue.OutputValue( + yield pvalue.TaggedOutput( 'above_cutoff_lengths', len(element)) if element.startswith(marker): # Emit this word to a different output with the 'marked strings' tag. - yield pvalue.OutputValue('marked strings', element) + yield pvalue.TaggedOutput('marked strings', element) # [END model_pardo_emitting_values_on_tagged_outputs] words = ['a', 'an', 'the', 'music', 'xyz'] @@ -226,7 +226,7 @@ class ParDoTest(unittest.TestCase): # [START model_pardo_with_undeclared_outputs] def even_odd(x): - yield pvalue.OutputValue('odd' if x % 2 else 'even', x) + yield pvalue.TaggedOutput('odd' if x % 2 else 'even', x) if x % 10 == 0: yield x http://git-wip-us.apache.org/repos/asf/beam/blob/9d3aebea/sdks/python/apache_beam/pvalue.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index d873669..2242c5a 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -230,19 +230,19 @@ class DoOutputsTuple(object): return pcoll -class OutputValue(object): +class TaggedOutput(object): """An object representing a tagged value. ParDo, Map, and FlatMap transforms can emit values on multiple outputs which are distinguished by string tags. The DoFn will return plain values - if it wants to emit on the main output and OutputValue objects + if it wants to emit on the main output and TaggedOutput objects if it wants to emit a value on a specific tagged output. """ def __init__(self, tag, value): if not isinstance(tag, basestring): raise TypeError( - 'Attempting to create a OutputValue with non-string tag %s' % tag) + 'Attempting to create a TaggedOutput with non-string tag %s' % tag) self.tag = tag self.value = value http://git-wip-us.apache.org/repos/asf/beam/blob/9d3aebea/sdks/python/apache_beam/runners/common.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 1c3e541..045c109 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -23,7 +23,7 @@ import sys from apache_beam.internal import util from apache_beam.metrics.execution import ScopedMetricsContainer -from apache_beam.pvalue import OutputValue +from apache_beam.pvalue import TaggedOutput from apache_beam.transforms import core from apache_beam.transforms.window import TimestampedValue from apache_beam.transforms.window import WindowFn @@ -430,7 +430,7 @@ class OutputProcessor(object): def process_outputs(self, windowed_input_element, results): """Dispatch the result of process computation to the appropriate receivers. - A value wrapped in a OutputValue object will be unwrapped and + A value wrapped in a TaggedOutput object will be unwrapped and then dispatched to the appropriate indexed output. """ if results is None: @@ -438,7 +438,7 @@ class OutputProcessor(object): for result in results: tag = None - if isinstance(result, OutputValue): + if isinstance(result, TaggedOutput): tag = result.tag if not isinstance(tag, basestring): raise TypeError('In %s, tag %s is not a string' % (self, tag)) @@ -472,7 +472,7 @@ class OutputProcessor(object): def finish_bundle_outputs(self, results): """Dispatch the result of finish_bundle to the appropriate receivers. - A value wrapped in a OutputValue object will be unwrapped and + A value wrapped in a TaggedOutput object will be unwrapped and then dispatched to the appropriate indexed output. """ if results is None: @@ -480,7 +480,7 @@ class OutputProcessor(object): for result in results: tag = None - if isinstance(result, OutputValue): + if isinstance(result, TaggedOutput): tag = result.tag if not isinstance(tag, basestring): raise TypeError('In %s, tag %s is not a string' % (self, tag)) http://git-wip-us.apache.org/repos/asf/beam/blob/9d3aebea/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py index 3ed553e..97d1ee8 100644 --- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py +++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py @@ -76,7 +76,7 @@ class ConsumerTrackingPipelineVisitorTest(unittest.TestCase): def process(self, element): if element < 0: - yield pvalue.OutputValue('tag_negative', element) + yield pvalue.TaggedOutput('tag_negative', element) else: yield element http://git-wip-us.apache.org/repos/asf/beam/blob/9d3aebea/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 918c46e..8e3c9a2 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -184,7 +184,7 @@ class DoFn(WithTypeHints, HasDisplayData): trivial_inference.infer_return_type(self.process, [input_type])) def _strip_output_annotations(self, type_hint): - annotations = (TimestampedValue, WindowedValue, pvalue.OutputValue) + annotations = (TimestampedValue, WindowedValue, pvalue.TaggedOutput) # TODO(robertwb): These should be parameterized types that the # type inferencer understands. if (type_hint in annotations @@ -1157,7 +1157,7 @@ class Partition(PTransformWithSideInputs): '%d not in [0, %d)' % (partition, n)) # Each input is directed into the output that corresponds to the # selected partition. - yield pvalue.OutputValue(str(partition), element) + yield pvalue.TaggedOutput(str(partition), element) def make_fn(self, fn): return fn if isinstance(fn, PartitionFn) else CallableWrapperPartitionFn(fn) http://git-wip-us.apache.org/repos/asf/beam/blob/9d3aebea/sdks/python/apache_beam/transforms/ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index ae77227..80c9768 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -214,9 +214,9 @@ class PTransformTest(unittest.TestCase): def process(self, element): yield element if element % 2 == 0: - yield pvalue.OutputValue('even', element) + yield pvalue.TaggedOutput('even', element) else: - yield pvalue.OutputValue('odd', element) + yield pvalue.TaggedOutput('odd', element) pipeline = TestPipeline() nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4]) @@ -231,8 +231,8 @@ class PTransformTest(unittest.TestCase): def test_par_do_with_multiple_outputs_and_using_return(self): def some_fn(v): if v % 2 == 0: - return [v, pvalue.OutputValue('even', v)] - return [v, pvalue.OutputValue('odd', v)] + return [v, pvalue.TaggedOutput('even', v)] + return [v, pvalue.TaggedOutput('odd', v)] pipeline = TestPipeline() nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4]) @@ -249,7 +249,7 @@ class PTransformTest(unittest.TestCase): nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4]) results = nums | 'ClassifyNumbers' >> beam.FlatMap( lambda x: [x, - pvalue.OutputValue('even' if x % 2 == 0 else 'odd', x)] + pvalue.TaggedOutput('even' if x % 2 == 0 else 'odd', x)] ).with_outputs() assert_that(results[None], equal_to([1, 2, 3, 4])) assert_that(results.odd, equal_to([1, 3]), label='assert:odd') @@ -262,7 +262,7 @@ class PTransformTest(unittest.TestCase): nums = pipeline | 'Some Numbers' >> beam.Create([1, 3, 5]) results = nums | 'ClassifyNumbers' >> beam.FlatMap( lambda x: [x, - pvalue.OutputValue('even' if x % 2 == 0 else 'odd', x)] + pvalue.TaggedOutput('even' if x % 2 == 0 else 'odd', x)] ).with_outputs() assert_that(results[None], equal_to([1, 3, 5])) assert_that(results.odd, equal_to([1, 3, 5]), label='assert:odd') http://git-wip-us.apache.org/repos/asf/beam/blob/9d3aebea/sdks/python/apache_beam/typehints/typecheck.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/typecheck.py b/sdks/python/apache_beam/typehints/typecheck.py index e475d9d..160d104 100644 --- a/sdks/python/apache_beam/typehints/typecheck.py +++ b/sdks/python/apache_beam/typehints/typecheck.py @@ -22,7 +22,7 @@ import inspect import sys import types -from apache_beam.pvalue import OutputValue +from apache_beam.pvalue import TaggedOutput from apache_beam.transforms.core import DoFn from apache_beam.transforms.window import WindowedValue from apache_beam.typehints import check_constraint @@ -136,7 +136,7 @@ class TypeCheckWrapperDoFn(AbstractDoFnWrapper): def type_check_output(o): # TODO(robertwb): Multi-output. - x = o.value if isinstance(o, (OutputValue, WindowedValue)) else o + x = o.value if isinstance(o, (TaggedOutput, WindowedValue)) else o self._type_check(self._output_type_hint, x, is_input=False) # If the return type is a generator, then we will need to interleave our