Repository: beam Updated Branches: refs/heads/python-sdk af49908b8 -> d0dc1f375
Revert "Remove dataflow_test.py" This reverts commit d5b90d8383e662e803ea79b31661250a043bcfd2. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/96fcc7d3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/96fcc7d3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/96fcc7d3 Branch: refs/heads/python-sdk Commit: 96fcc7d31c2540f867c3a73903c2aa99183a6b8b Parents: af49908 Author: Robert Bradshaw <rober...@gmail.com> Authored: Tue Jan 24 09:28:38 2017 -0800 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Tue Jan 24 09:28:38 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/dataflow_test.py | 418 +++++++++++++++++++ .../apache_beam/transforms/ptransform_test.py | 67 --- .../apache_beam/transforms/sideinputs_test.py | 208 +-------- 3 files changed, 419 insertions(+), 274 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/96fcc7d3/sdks/python/apache_beam/dataflow_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py new file mode 100644 index 0000000..f410230 --- /dev/null +++ b/sdks/python/apache_beam/dataflow_test.py @@ -0,0 +1,418 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Integration tests for the dataflow package.""" + +from __future__ import absolute_import + +import logging +import re +import unittest + +import apache_beam as beam +from apache_beam.pvalue import AsDict +from apache_beam.pvalue import AsIter as AllOf +from apache_beam.pvalue import AsList +from apache_beam.pvalue import AsSingleton +from apache_beam.pvalue import EmptySideInput +from apache_beam.pvalue import SideOutputValue +from apache_beam.test_pipeline import TestPipeline +from apache_beam.transforms import Create +from apache_beam.transforms import DoFn +from apache_beam.transforms import FlatMap +from apache_beam.transforms import GroupByKey +from apache_beam.transforms import Map +from apache_beam.transforms import ParDo +from apache_beam.transforms import WindowInto +from apache_beam.transforms.util import assert_that +from apache_beam.transforms.util import equal_to +from apache_beam.transforms.window import IntervalWindow +from apache_beam.transforms.window import WindowFn +from nose.plugins.attrib import attr + + +class DataflowTest(unittest.TestCase): + """Dataflow integration tests.""" + + SAMPLE_DATA = ['aa bb cc aa bb aa \n'] * 10 + SAMPLE_RESULT = [('cc', 10), ('bb', 20), ('aa', 30)] + + @beam.ptransform_fn + def Count(pcoll): # pylint: disable=invalid-name, no-self-argument + """A Count transform: v, ... => (v, n), ...""" + return (pcoll + | 'AddCount' >> Map(lambda x: (x, 1)) + | 'GroupCounts' >> GroupByKey() + | 'AddCounts' >> Map(lambda (x, ones): (x, sum(ones)))) + + @attr('ValidatesRunner') + def test_word_count(self): + pipeline = TestPipeline() + lines = pipeline | 'SomeWords' >> Create(DataflowTest.SAMPLE_DATA) + result = ( + (lines | 'GetWords' >> FlatMap(lambda x: re.findall(r'\w+', x))) + .apply('CountWords', DataflowTest.Count)) + assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT)) + pipeline.run() + + @attr('ValidatesRunner') + def test_map(self): + pipeline = TestPipeline() + lines = pipeline | 'input' >> Create(['a', 'b', 'c']) + result = (lines + | 'upper' >> Map(str.upper) + | 'prefix' >> Map(lambda x, prefix: prefix + x, 'foo-')) + assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C'])) + pipeline.run() + + @attr('ValidatesRunner') + def test_par_do_with_side_input_as_arg(self): + pipeline = TestPipeline() + words_list = ['aa', 'bb', 'cc'] + words = pipeline | 'SomeWords' >> Create(words_list) + prefix = pipeline | 'SomeString' >> Create(['xyz']) # side in + suffix = 'zyx' + result = words | FlatMap( + 'DecorateWords', + lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)], + AsSingleton(prefix), suffix) + assert_that(result, equal_to(['xyz-%s-zyx' % x for x in words_list])) + pipeline.run() + + @attr('ValidatesRunner') + def test_par_do_with_side_input_as_keyword_arg(self): + pipeline = TestPipeline() + words_list = ['aa', 'bb', 'cc'] + words = pipeline | 'SomeWords' >> Create(words_list) + prefix = 'zyx' + suffix = pipeline | 'SomeString' >> Create(['xyz']) # side in + result = words | FlatMap( + 'DecorateWords', + lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)], + prefix, sfx=AsSingleton(suffix)) + assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list])) + pipeline.run() + + @attr('ValidatesRunner') + def test_par_do_with_do_fn_object(self): + class SomeDoFn(DoFn): + """A custom DoFn for a FlatMap transform.""" + + def process(self, context, prefix, suffix): + return ['%s-%s-%s' % (prefix, context.element, suffix)] + + pipeline = TestPipeline() + words_list = ['aa', 'bb', 'cc'] + words = pipeline | 'SomeWords' >> Create(words_list) + prefix = 'zyx' + suffix = pipeline | 'SomeString' >> Create(['xyz']) # side in + result = words | 'DecorateWordsDoFn' >> ParDo( + SomeDoFn(), prefix, suffix=AsSingleton(suffix)) + assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list])) + pipeline.run() + + @attr('ValidatesRunner') + def test_par_do_with_multiple_outputs_and_using_yield(self): + class SomeDoFn(DoFn): + """A custom DoFn using yield.""" + + def process(self, context): + yield context.element + if context.element % 2 == 0: + yield SideOutputValue('even', context.element) + else: + yield SideOutputValue('odd', context.element) + + pipeline = TestPipeline() + nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4]) + results = nums | ParDo( + 'ClassifyNumbers', SomeDoFn()).with_outputs('odd', 'even', main='main') + assert_that(results.main, equal_to([1, 2, 3, 4])) + assert_that(results.odd, equal_to([1, 3]), label='assert:odd') + assert_that(results.even, equal_to([2, 4]), label='assert:even') + pipeline.run() + + @attr('ValidatesRunner') + def test_par_do_with_multiple_outputs_and_using_return(self): + def some_fn(v): + if v % 2 == 0: + return [v, SideOutputValue('even', v)] + else: + return [v, SideOutputValue('odd', v)] + + pipeline = TestPipeline() + nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4]) + results = nums | FlatMap( + 'ClassifyNumbers', some_fn).with_outputs('odd', 'even', main='main') + assert_that(results.main, equal_to([1, 2, 3, 4])) + assert_that(results.odd, equal_to([1, 3]), label='assert:odd') + assert_that(results.even, equal_to([2, 4]), label='assert:even') + pipeline.run() + + @attr('ValidatesRunner') + def test_empty_singleton_side_input(self): + pipeline = TestPipeline() + pcol = pipeline | 'start' >> Create([1, 2]) + side = pipeline | 'side' >> Create([]) # Empty side input. + + def my_fn(k, s): + v = ('empty' if isinstance(s, EmptySideInput) else 'full') + return [(k, v)] + result = pcol | 'compute' >> FlatMap(my_fn, AsSingleton(side)) + assert_that(result, equal_to([(1, 'empty'), (2, 'empty')])) + pipeline.run() + + # @attr('ValidatesRunner') + # TODO(BEAM-1124): Temporarily disable it due to test failed running on + # Dataflow service. + def test_multi_valued_singleton_side_input(self): + pipeline = TestPipeline() + pcol = pipeline | 'start' >> Create([1, 2]) + side = pipeline | 'side' >> Create([3, 4]) # 2 values in side input. + pcol | 'compute' >> FlatMap(lambda x, s: [x * s], AsSingleton(side)) # pylint: disable=expression-not-assigned + with self.assertRaises(ValueError): + pipeline.run() + + @attr('ValidatesRunner') + def test_default_value_singleton_side_input(self): + pipeline = TestPipeline() + pcol = pipeline | 'start' >> Create([1, 2]) + side = pipeline | 'side' >> Create([]) # 0 values in side input. + result = pcol | FlatMap(lambda x, s: [x * s], AsSingleton(side, 10)) + assert_that(result, equal_to([10, 20])) + pipeline.run() + + @attr('ValidatesRunner') + def test_iterable_side_input(self): + pipeline = TestPipeline() + pcol = pipeline | 'start' >> Create([1, 2]) + side = pipeline | 'side' >> Create([3, 4]) # 2 values in side input. + result = pcol | FlatMap('compute', + lambda x, s: [x * y for y in s], AllOf(side)) + assert_that(result, equal_to([3, 4, 6, 8])) + pipeline.run() + + @attr('ValidatesRunner') + def test_undeclared_side_outputs(self): + pipeline = TestPipeline() + nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4]) + results = nums | FlatMap( + 'ClassifyNumbers', + lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)] + ).with_outputs() + assert_that(results[None], equal_to([1, 2, 3, 4])) + assert_that(results.odd, equal_to([1, 3]), label='assert:odd') + assert_that(results.even, equal_to([2, 4]), label='assert:even') + pipeline.run() + + @attr('ValidatesRunner') + def test_empty_side_outputs(self): + pipeline = TestPipeline() + nums = pipeline | 'Some Numbers' >> Create([1, 3, 5]) + results = nums | FlatMap( + 'ClassifyNumbers', + lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)] + ).with_outputs() + assert_that(results[None], equal_to([1, 3, 5])) + assert_that(results.odd, equal_to([1, 3, 5]), label='assert:odd') + assert_that(results.even, equal_to([]), label='assert:even') + pipeline.run() + + @attr('ValidatesRunner') + def test_as_list_and_as_dict_side_inputs(self): + a_list = [5, 1, 3, 2, 9] + some_pairs = [('crouton', 17), ('supreme', None)] + pipeline = TestPipeline() + main_input = pipeline | 'main input' >> Create([1]) + side_list = pipeline | 'side list' >> Create(a_list) + side_pairs = pipeline | 'side pairs' >> Create(some_pairs) + results = main_input | FlatMap( + 'concatenate', + lambda x, the_list, the_dict: [[x, the_list, the_dict]], + AsList(side_list), AsDict(side_pairs)) + + def matcher(expected_elem, expected_list, expected_pairs): + def match(actual): + [[actual_elem, actual_list, actual_dict]] = actual + equal_to([expected_elem])([actual_elem]) + equal_to(expected_list)(actual_list) + equal_to(expected_pairs)(actual_dict.iteritems()) + return match + + assert_that(results, matcher(1, a_list, some_pairs)) + pipeline.run() + + @attr('ValidatesRunner') + def test_as_singleton_without_unique_labels(self): + # This should succeed as calling AsSingleton on the same PCollection twice + # with the same defaults will return the same PCollectionView. + a_list = [2] + pipeline = TestPipeline() + main_input = pipeline | 'main input' >> Create([1]) + side_list = pipeline | 'side list' >> Create(a_list) + results = main_input | FlatMap( + 'test', + lambda x, s1, s2: [[x, s1, s2]], + AsSingleton(side_list), AsSingleton(side_list)) + + def matcher(expected_elem, expected_singleton): + def match(actual): + [[actual_elem, actual_singleton1, actual_singleton2]] = actual + equal_to([expected_elem])([actual_elem]) + equal_to([expected_singleton])([actual_singleton1]) + equal_to([expected_singleton])([actual_singleton2]) + return match + + assert_that(results, matcher(1, 2)) + pipeline.run() + + @attr('ValidatesRunner') + def test_as_singleton_with_different_defaults_without_unique_labels(self): + # This should fail as AsSingleton with distinct default values should create + # distinct PCollectionViews with the same full_label. + a_list = [2] + pipeline = TestPipeline() + main_input = pipeline | 'main input' >> Create([1]) + side_list = pipeline | 'side list' >> Create(a_list) + + with self.assertRaises(RuntimeError) as e: + _ = main_input | FlatMap( + 'test', + lambda x, s1, s2: [[x, s1, s2]], + AsSingleton(side_list), AsSingleton(side_list, default_value=3)) + self.assertTrue( + e.exception.message.startswith( + 'Transform "ViewAsSingleton(side list.None)" does not have a ' + 'stable unique label.')) + + @attr('ValidatesRunner') + def test_as_singleton_with_different_defaults_with_unique_labels(self): + a_list = [] + pipeline = TestPipeline() + main_input = pipeline | 'main input' >> Create([1]) + side_list = pipeline | 'side list' >> Create(a_list) + results = main_input | FlatMap( + 'test', + lambda x, s1, s2: [[x, s1, s2]], + AsSingleton('si1', side_list, default_value=2), + AsSingleton('si2', side_list, default_value=3)) + + def matcher(expected_elem, expected_singleton1, expected_singleton2): + def match(actual): + [[actual_elem, actual_singleton1, actual_singleton2]] = actual + equal_to([expected_elem])([actual_elem]) + equal_to([expected_singleton1])([actual_singleton1]) + equal_to([expected_singleton2])([actual_singleton2]) + return match + + assert_that(results, matcher(1, 2, 3)) + pipeline.run() + + @attr('ValidatesRunner') + def test_as_list_without_unique_labels(self): + # This should succeed as calling AsList on the same PCollection twice will + # return the same PCollectionView. + a_list = [1, 2, 3] + pipeline = TestPipeline() + main_input = pipeline | 'main input' >> Create([1]) + side_list = pipeline | 'side list' >> Create(a_list) + results = main_input | FlatMap( + 'test', + lambda x, ls1, ls2: [[x, ls1, ls2]], + AsList(side_list), AsList(side_list)) + + def matcher(expected_elem, expected_list): + def match(actual): + [[actual_elem, actual_list1, actual_list2]] = actual + equal_to([expected_elem])([actual_elem]) + equal_to(expected_list)(actual_list1) + equal_to(expected_list)(actual_list2) + return match + + assert_that(results, matcher(1, [1, 2, 3])) + pipeline.run() + + @attr('ValidatesRunner') + def test_as_list_with_unique_labels(self): + a_list = [1, 2, 3] + pipeline = TestPipeline() + main_input = pipeline | 'main input' >> Create([1]) + side_list = pipeline | 'side list' >> Create(a_list) + results = main_input | FlatMap( + 'test', + lambda x, ls1, ls2: [[x, ls1, ls2]], + AsList(side_list), AsList(side_list, label='label')) + + def matcher(expected_elem, expected_list): + def match(actual): + [[actual_elem, actual_list1, actual_list2]] = actual + equal_to([expected_elem])([actual_elem]) + equal_to(expected_list)(actual_list1) + equal_to(expected_list)(actual_list2) + return match + + assert_that(results, matcher(1, [1, 2, 3])) + pipeline.run() + + @attr('ValidatesRunner') + def test_as_dict_with_unique_labels(self): + some_kvs = [('a', 1), ('b', 2)] + pipeline = TestPipeline() + main_input = pipeline | 'main input' >> Create([1]) + side_kvs = pipeline | 'side kvs' >> Create(some_kvs) + results = main_input | FlatMap( + 'test', + lambda x, dct1, dct2: [[x, dct1, dct2]], + AsDict(side_kvs), AsDict(side_kvs, label='label')) + + def matcher(expected_elem, expected_kvs): + def match(actual): + [[actual_elem, actual_dict1, actual_dict2]] = actual + equal_to([expected_elem])([actual_elem]) + equal_to(expected_kvs)(actual_dict1.iteritems()) + equal_to(expected_kvs)(actual_dict2.iteritems()) + return match + + assert_that(results, matcher(1, some_kvs)) + pipeline.run() + + @attr('ValidatesRunner') + def test_window_transform(self): + class TestWindowFn(WindowFn): + """Windowing function adding two disjoint windows to each element.""" + + def assign(self, assign_context): + _ = assign_context + return [IntervalWindow(10, 20), IntervalWindow(20, 30)] + + def merge(self, existing_windows): + return existing_windows + + pipeline = TestPipeline() + numbers = pipeline | 'KVs' >> Create([(1, 10), (2, 20), (3, 30)]) + result = (numbers + | 'W' >> WindowInto(windowfn=TestWindowFn()) + | 'G' >> GroupByKey()) + assert_that( + result, equal_to([(1, [10]), (1, [10]), (2, [20]), + (2, [20]), (3, [30]), (3, [30])])) + pipeline.run() + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/96fcc7d3/sdks/python/apache_beam/transforms/ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 68e4482..827bc83 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -24,7 +24,6 @@ import re import unittest import hamcrest as hc -from nose.plugins.attrib import attr import apache_beam as beam from apache_beam.test_pipeline import TestPipeline @@ -190,72 +189,6 @@ class PTransformTest(unittest.TestCase): assert_that(r2.m, equal_to([3, 4, 5]), label='r2') pipeline.run() - @attr('ValidatesRunner') - def test_par_do_with_multiple_outputs_and_using_yield(self): - class SomeDoFn(beam.DoFn): - """A custom DoFn using yield.""" - - def process(self, context): - yield context.element - if context.element % 2 == 0: - yield pvalue.SideOutputValue('even', context.element) - else: - yield pvalue.SideOutputValue('odd', context.element) - - pipeline = TestPipeline() - nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4]) - results = nums | beam.ParDo( - 'ClassifyNumbers', SomeDoFn()).with_outputs('odd', 'even', main='main') - assert_that(results.main, equal_to([1, 2, 3, 4])) - assert_that(results.odd, equal_to([1, 3]), label='assert:odd') - assert_that(results.even, equal_to([2, 4]), label='assert:even') - pipeline.run() - - @attr('ValidatesRunner') - def test_par_do_with_multiple_outputs_and_using_return(self): - def some_fn(v): - if v % 2 == 0: - return [v, pvalue.SideOutputValue('even', v)] - else: - return [v, pvalue.SideOutputValue('odd', v)] - - pipeline = TestPipeline() - nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4]) - results = nums | beam.FlatMap( - 'ClassifyNumbers', some_fn).with_outputs('odd', 'even', main='main') - assert_that(results.main, equal_to([1, 2, 3, 4])) - assert_that(results.odd, equal_to([1, 3]), label='assert:odd') - assert_that(results.even, equal_to([2, 4]), label='assert:even') - pipeline.run() - - @attr('ValidatesRunner') - def test_undeclared_side_outputs(self): - pipeline = TestPipeline() - nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4]) - results = nums | beam.FlatMap( - 'ClassifyNumbers', - lambda x: [x, - pvalue.SideOutputValue('even' if x % 2 == 0 else 'odd', x)] - ).with_outputs() - assert_that(results[None], equal_to([1, 2, 3, 4])) - assert_that(results.odd, equal_to([1, 3]), label='assert:odd') - assert_that(results.even, equal_to([2, 4]), label='assert:even') - pipeline.run() - - @attr('ValidatesRunner') - def test_empty_side_outputs(self): - pipeline = TestPipeline() - nums = pipeline | 'Some Numbers' >> beam.Create([1, 3, 5]) - results = nums | beam.FlatMap( - 'ClassifyNumbers', - lambda x: [x, - pvalue.SideOutputValue('even' if x % 2 == 0 else 'odd', x)] - ).with_outputs() - assert_that(results[None], equal_to([1, 3, 5])) - assert_that(results.odd, equal_to([1, 3, 5]), label='assert:odd') - assert_that(results.even, equal_to([]), label='assert:even') - pipeline.run() - def test_do_requires_do_fn_returning_iterable(self): # This function is incorrect because it returns an object that isn't an # iterable. http://git-wip-us.apache.org/repos/asf/beam/blob/96fcc7d3/sdks/python/apache_beam/transforms/sideinputs_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py index 9384e7b..a327dc8 100644 --- a/sdks/python/apache_beam/transforms/sideinputs_test.py +++ b/sdks/python/apache_beam/transforms/sideinputs_test.py @@ -20,10 +20,7 @@ import logging import unittest -from nose.plugins.attrib import attr - import apache_beam as beam -from apache_beam.test_pipeline import TestPipeline from apache_beam.transforms import window from apache_beam.transforms.util import assert_that, equal_to @@ -31,7 +28,7 @@ from apache_beam.transforms.util import assert_that, equal_to class SideInputsTest(unittest.TestCase): def create_pipeline(self): - return TestPipeline('DirectRunner') + return beam.Pipeline('DirectRunner') def run_windowed_side_inputs(self, elements, main_window_fn, side_window_fn=None, @@ -128,209 +125,6 @@ class SideInputsTest(unittest.TestCase): (11, {'k11': 'v11'}), ]) - @attr('ValidatesRunner') - def test_empty_singleton_side_input(self): - pipeline = self.create_pipeline() - pcol = pipeline | 'start' >> beam.Create([1, 2]) - side = pipeline | 'side' >> beam.Create([]) # Empty side input. - - def my_fn(k, s): - # TODO(robertwb): Should this be an error as in Java? - v = ('empty' if isinstance(s, beam.pvalue.EmptySideInput) else 'full') - return [(k, v)] - result = pcol | 'compute' >> beam.FlatMap( - my_fn, beam.pvalue.AsSingleton(side)) - assert_that(result, equal_to([(1, 'empty'), (2, 'empty')])) - pipeline.run() - - # @attr('ValidatesRunner') - # TODO(BEAM-1124): Temporarily disable it due to test failed running on - # Dataflow service. - def test_multi_valued_singleton_side_input(self): - pipeline = self.create_pipeline() - pcol = pipeline | 'start' >> beam.Create([1, 2]) - side = pipeline | 'side' >> beam.Create([3, 4]) # 2 values in side input. - pcol | 'compute' >> beam.FlatMap( # pylint: disable=expression-not-assigned - lambda x, s: [x * s], beam.pvalue.AsSingleton(side)) - with self.assertRaises(ValueError): - pipeline.run() - - @attr('ValidatesRunner') - def test_default_value_singleton_side_input(self): - pipeline = self.create_pipeline() - pcol = pipeline | 'start' >> beam.Create([1, 2]) - side = pipeline | 'side' >> beam.Create([]) # 0 values in side input. - result = pcol | beam.FlatMap( - lambda x, s: [x * s], beam.pvalue.AsSingleton(side, 10)) - assert_that(result, equal_to([10, 20])) - pipeline.run() - - @attr('ValidatesRunner') - def test_iterable_side_input(self): - pipeline = self.create_pipeline() - pcol = pipeline | 'start' >> beam.Create([1, 2]) - side = pipeline | 'side' >> beam.Create([3, 4]) # 2 values in side input. - result = pcol | 'compute' >> beam.FlatMap( - lambda x, s: [x * y for y in s], - beam.pvalue.AsIter(side)) - assert_that(result, equal_to([3, 4, 6, 8])) - pipeline.run() - - @attr('ValidatesRunner') - def test_as_list_and_as_dict_side_inputs(self): - a_list = [5, 1, 3, 2, 9] - some_pairs = [('crouton', 17), ('supreme', None)] - pipeline = self.create_pipeline() - main_input = pipeline | 'main input' >> beam.Create([1]) - side_list = pipeline | 'side list' >> beam.Create(a_list) - side_pairs = pipeline | 'side pairs' >> beam.Create(some_pairs) - results = main_input | 'concatenate' >> beam.FlatMap( - lambda x, the_list, the_dict: [[x, the_list, the_dict]], - beam.pvalue.AsList(side_list), beam.pvalue.AsDict(side_pairs)) - - def matcher(expected_elem, expected_list, expected_pairs): - def match(actual): - [[actual_elem, actual_list, actual_dict]] = actual - equal_to([expected_elem])([actual_elem]) - equal_to(expected_list)(actual_list) - equal_to(expected_pairs)(actual_dict.iteritems()) - return match - - assert_that(results, matcher(1, a_list, some_pairs)) - pipeline.run() - - @attr('ValidatesRunner') - def test_as_singleton_without_unique_labels(self): - # This should succeed as calling beam.pvalue.AsSingleton on the same - # PCollection twice with the same defaults will return the same - # PCollectionView. - a_list = [2] - pipeline = self.create_pipeline() - main_input = pipeline | 'main input' >> beam.Create([1]) - side_list = pipeline | 'side list' >> beam.Create(a_list) - results = main_input | beam.FlatMap( - lambda x, s1, s2: [[x, s1, s2]], - beam.pvalue.AsSingleton(side_list), beam.pvalue.AsSingleton(side_list)) - - def matcher(expected_elem, expected_singleton): - def match(actual): - [[actual_elem, actual_singleton1, actual_singleton2]] = actual - equal_to([expected_elem])([actual_elem]) - equal_to([expected_singleton])([actual_singleton1]) - equal_to([expected_singleton])([actual_singleton2]) - return match - - assert_that(results, matcher(1, 2)) - pipeline.run() - - @attr('ValidatesRunner') - def test_as_singleton_with_different_defaults_without_unique_labels(self): - # This should fail as beam.pvalue.AsSingleton with distinct default values - # should beam.Create distinct PCollectionViews with the same full_label. - a_list = [2] - pipeline = self.create_pipeline() - main_input = pipeline | 'main input' >> beam.Create([1]) - side_list = pipeline | 'side list' >> beam.Create(a_list) - - with self.assertRaises(RuntimeError) as e: - _ = main_input | beam.FlatMap( - lambda x, s1, s2: [[x, s1, s2]], - beam.pvalue.AsSingleton(side_list), - beam.pvalue.AsSingleton(side_list, default_value=3)) - self.assertTrue( - e.exception.message.startswith( - 'Transform "ViewAsSingleton(side list.None)" does not have a ' - 'stable unique label.')) - - @attr('ValidatesRunner') - def test_as_singleton_with_different_defaults_with_unique_labels(self): - a_list = [] - pipeline = self.create_pipeline() - main_input = pipeline | 'main input' >> beam.Create([1]) - side_list = pipeline | 'side list' >> beam.Create(a_list) - results = main_input | beam.FlatMap( - lambda x, s1, s2: [[x, s1, s2]], - beam.pvalue.AsSingleton('si1', side_list, default_value=2), - beam.pvalue.AsSingleton('si2', side_list, default_value=3)) - - def matcher(expected_elem, expected_singleton1, expected_singleton2): - def match(actual): - [[actual_elem, actual_singleton1, actual_singleton2]] = actual - equal_to([expected_elem])([actual_elem]) - equal_to([expected_singleton1])([actual_singleton1]) - equal_to([expected_singleton2])([actual_singleton2]) - return match - - assert_that(results, matcher(1, 2, 3)) - pipeline.run() - - @attr('ValidatesRunner') - def test_as_list_without_unique_labels(self): - # This should succeed as calling beam.pvalue.AsList on the same - # PCollection twice will return the same PCollectionView. - a_list = [1, 2, 3] - pipeline = self.create_pipeline() - main_input = pipeline | 'main input' >> beam.Create([1]) - side_list = pipeline | 'side list' >> beam.Create(a_list) - results = main_input | beam.FlatMap( - lambda x, ls1, ls2: [[x, ls1, ls2]], - beam.pvalue.AsList(side_list), beam.pvalue.AsList(side_list)) - - def matcher(expected_elem, expected_list): - def match(actual): - [[actual_elem, actual_list1, actual_list2]] = actual - equal_to([expected_elem])([actual_elem]) - equal_to(expected_list)(actual_list1) - equal_to(expected_list)(actual_list2) - return match - - assert_that(results, matcher(1, [1, 2, 3])) - pipeline.run() - - @attr('ValidatesRunner') - def test_as_list_with_unique_labels(self): - a_list = [1, 2, 3] - pipeline = self.create_pipeline() - main_input = pipeline | 'main input' >> beam.Create([1]) - side_list = pipeline | 'side list' >> beam.Create(a_list) - results = main_input | beam.FlatMap( - lambda x, ls1, ls2: [[x, ls1, ls2]], - beam.pvalue.AsList(side_list), - beam.pvalue.AsList(side_list, label='label')) - - def matcher(expected_elem, expected_list): - def match(actual): - [[actual_elem, actual_list1, actual_list2]] = actual - equal_to([expected_elem])([actual_elem]) - equal_to(expected_list)(actual_list1) - equal_to(expected_list)(actual_list2) - return match - - assert_that(results, matcher(1, [1, 2, 3])) - pipeline.run() - - @attr('ValidatesRunner') - def test_as_dict_with_unique_labels(self): - some_kvs = [('a', 1), ('b', 2)] - pipeline = self.create_pipeline() - main_input = pipeline | 'main input' >> beam.Create([1]) - side_kvs = pipeline | 'side kvs' >> beam.Create(some_kvs) - results = main_input | beam.FlatMap( - lambda x, dct1, dct2: [[x, dct1, dct2]], - beam.pvalue.AsDict(side_kvs), - beam.pvalue.AsDict(side_kvs, label='label')) - - def matcher(expected_elem, expected_kvs): - def match(actual): - [[actual_elem, actual_dict1, actual_dict2]] = actual - equal_to([expected_elem])([actual_elem]) - equal_to(expected_kvs)(actual_dict1.iteritems()) - equal_to(expected_kvs)(actual_dict2.iteritems()) - return match - - assert_that(results, matcher(1, some_kvs)) - pipeline.run() - if __name__ == '__main__': logging.getLogger().setLevel(logging.DEBUG)