Remove dataflow_test.py Many of these tests were redundant with tests elsewhere, and the ones that weren't were put closer to similar tests.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d5b90d83 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d5b90d83 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d5b90d83 Branch: refs/heads/python-sdk Commit: d5b90d8383e662e803ea79b31661250a043bcfd2 Parents: 01b3628 Author: Robert Bradshaw <rober...@gmail.com> Authored: Sat Jan 21 21:53:42 2017 -0800 Committer: Robert Bradshaw <rober...@google.com> Committed: Mon Jan 23 14:36:55 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/dataflow_test.py | 418 ------------------- .../apache_beam/transforms/ptransform_test.py | 67 +++ .../apache_beam/transforms/sideinputs_test.py | 208 ++++++++- 3 files changed, 274 insertions(+), 419 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d5b90d83/sdks/python/apache_beam/dataflow_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py deleted file mode 100644 index f410230..0000000 --- a/sdks/python/apache_beam/dataflow_test.py +++ /dev/null @@ -1,418 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Integration tests for the dataflow package.""" - -from __future__ import absolute_import - -import logging -import re -import unittest - -import apache_beam as beam -from apache_beam.pvalue import AsDict -from apache_beam.pvalue import AsIter as AllOf -from apache_beam.pvalue import AsList -from apache_beam.pvalue import AsSingleton -from apache_beam.pvalue import EmptySideInput -from apache_beam.pvalue import SideOutputValue -from apache_beam.test_pipeline import TestPipeline -from apache_beam.transforms import Create -from apache_beam.transforms import DoFn -from apache_beam.transforms import FlatMap -from apache_beam.transforms import GroupByKey -from apache_beam.transforms import Map -from apache_beam.transforms import ParDo -from apache_beam.transforms import WindowInto -from apache_beam.transforms.util import assert_that -from apache_beam.transforms.util import equal_to -from apache_beam.transforms.window import IntervalWindow -from apache_beam.transforms.window import WindowFn -from nose.plugins.attrib import attr - - -class DataflowTest(unittest.TestCase): - """Dataflow integration tests.""" - - SAMPLE_DATA = ['aa bb cc aa bb aa \n'] * 10 - SAMPLE_RESULT = [('cc', 10), ('bb', 20), ('aa', 30)] - - @beam.ptransform_fn - def Count(pcoll): # pylint: disable=invalid-name, no-self-argument - """A Count transform: v, ... => (v, n), ...""" - return (pcoll - | 'AddCount' >> Map(lambda x: (x, 1)) - | 'GroupCounts' >> GroupByKey() - | 'AddCounts' >> Map(lambda (x, ones): (x, sum(ones)))) - - @attr('ValidatesRunner') - def test_word_count(self): - pipeline = TestPipeline() - lines = pipeline | 'SomeWords' >> Create(DataflowTest.SAMPLE_DATA) - result = ( - (lines | 'GetWords' >> FlatMap(lambda x: re.findall(r'\w+', x))) - .apply('CountWords', DataflowTest.Count)) - assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT)) - pipeline.run() - - @attr('ValidatesRunner') - def test_map(self): - pipeline = TestPipeline() - lines = pipeline | 'input' >> Create(['a', 'b', 'c']) - result = (lines - | 'upper' >> Map(str.upper) - | 'prefix' >> Map(lambda x, prefix: prefix + x, 'foo-')) - assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C'])) - pipeline.run() - - @attr('ValidatesRunner') - def test_par_do_with_side_input_as_arg(self): - pipeline = TestPipeline() - words_list = ['aa', 'bb', 'cc'] - words = pipeline | 'SomeWords' >> Create(words_list) - prefix = pipeline | 'SomeString' >> Create(['xyz']) # side in - suffix = 'zyx' - result = words | FlatMap( - 'DecorateWords', - lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)], - AsSingleton(prefix), suffix) - assert_that(result, equal_to(['xyz-%s-zyx' % x for x in words_list])) - pipeline.run() - - @attr('ValidatesRunner') - def test_par_do_with_side_input_as_keyword_arg(self): - pipeline = TestPipeline() - words_list = ['aa', 'bb', 'cc'] - words = pipeline | 'SomeWords' >> Create(words_list) - prefix = 'zyx' - suffix = pipeline | 'SomeString' >> Create(['xyz']) # side in - result = words | FlatMap( - 'DecorateWords', - lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)], - prefix, sfx=AsSingleton(suffix)) - assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list])) - pipeline.run() - - @attr('ValidatesRunner') - def test_par_do_with_do_fn_object(self): - class SomeDoFn(DoFn): - """A custom DoFn for a FlatMap transform.""" - - def process(self, context, prefix, suffix): - return ['%s-%s-%s' % (prefix, context.element, suffix)] - - pipeline = TestPipeline() - words_list = ['aa', 'bb', 'cc'] - words = pipeline | 'SomeWords' >> Create(words_list) - prefix = 'zyx' - suffix = pipeline | 'SomeString' >> Create(['xyz']) # side in - result = words | 'DecorateWordsDoFn' >> ParDo( - SomeDoFn(), prefix, suffix=AsSingleton(suffix)) - assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list])) - pipeline.run() - - @attr('ValidatesRunner') - def test_par_do_with_multiple_outputs_and_using_yield(self): - class SomeDoFn(DoFn): - """A custom DoFn using yield.""" - - def process(self, context): - yield context.element - if context.element % 2 == 0: - yield SideOutputValue('even', context.element) - else: - yield SideOutputValue('odd', context.element) - - pipeline = TestPipeline() - nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4]) - results = nums | ParDo( - 'ClassifyNumbers', SomeDoFn()).with_outputs('odd', 'even', main='main') - assert_that(results.main, equal_to([1, 2, 3, 4])) - assert_that(results.odd, equal_to([1, 3]), label='assert:odd') - assert_that(results.even, equal_to([2, 4]), label='assert:even') - pipeline.run() - - @attr('ValidatesRunner') - def test_par_do_with_multiple_outputs_and_using_return(self): - def some_fn(v): - if v % 2 == 0: - return [v, SideOutputValue('even', v)] - else: - return [v, SideOutputValue('odd', v)] - - pipeline = TestPipeline() - nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4]) - results = nums | FlatMap( - 'ClassifyNumbers', some_fn).with_outputs('odd', 'even', main='main') - assert_that(results.main, equal_to([1, 2, 3, 4])) - assert_that(results.odd, equal_to([1, 3]), label='assert:odd') - assert_that(results.even, equal_to([2, 4]), label='assert:even') - pipeline.run() - - @attr('ValidatesRunner') - def test_empty_singleton_side_input(self): - pipeline = TestPipeline() - pcol = pipeline | 'start' >> Create([1, 2]) - side = pipeline | 'side' >> Create([]) # Empty side input. - - def my_fn(k, s): - v = ('empty' if isinstance(s, EmptySideInput) else 'full') - return [(k, v)] - result = pcol | 'compute' >> FlatMap(my_fn, AsSingleton(side)) - assert_that(result, equal_to([(1, 'empty'), (2, 'empty')])) - pipeline.run() - - # @attr('ValidatesRunner') - # TODO(BEAM-1124): Temporarily disable it due to test failed running on - # Dataflow service. - def test_multi_valued_singleton_side_input(self): - pipeline = TestPipeline() - pcol = pipeline | 'start' >> Create([1, 2]) - side = pipeline | 'side' >> Create([3, 4]) # 2 values in side input. - pcol | 'compute' >> FlatMap(lambda x, s: [x * s], AsSingleton(side)) # pylint: disable=expression-not-assigned - with self.assertRaises(ValueError): - pipeline.run() - - @attr('ValidatesRunner') - def test_default_value_singleton_side_input(self): - pipeline = TestPipeline() - pcol = pipeline | 'start' >> Create([1, 2]) - side = pipeline | 'side' >> Create([]) # 0 values in side input. - result = pcol | FlatMap(lambda x, s: [x * s], AsSingleton(side, 10)) - assert_that(result, equal_to([10, 20])) - pipeline.run() - - @attr('ValidatesRunner') - def test_iterable_side_input(self): - pipeline = TestPipeline() - pcol = pipeline | 'start' >> Create([1, 2]) - side = pipeline | 'side' >> Create([3, 4]) # 2 values in side input. - result = pcol | FlatMap('compute', - lambda x, s: [x * y for y in s], AllOf(side)) - assert_that(result, equal_to([3, 4, 6, 8])) - pipeline.run() - - @attr('ValidatesRunner') - def test_undeclared_side_outputs(self): - pipeline = TestPipeline() - nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4]) - results = nums | FlatMap( - 'ClassifyNumbers', - lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)] - ).with_outputs() - assert_that(results[None], equal_to([1, 2, 3, 4])) - assert_that(results.odd, equal_to([1, 3]), label='assert:odd') - assert_that(results.even, equal_to([2, 4]), label='assert:even') - pipeline.run() - - @attr('ValidatesRunner') - def test_empty_side_outputs(self): - pipeline = TestPipeline() - nums = pipeline | 'Some Numbers' >> Create([1, 3, 5]) - results = nums | FlatMap( - 'ClassifyNumbers', - lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)] - ).with_outputs() - assert_that(results[None], equal_to([1, 3, 5])) - assert_that(results.odd, equal_to([1, 3, 5]), label='assert:odd') - assert_that(results.even, equal_to([]), label='assert:even') - pipeline.run() - - @attr('ValidatesRunner') - def test_as_list_and_as_dict_side_inputs(self): - a_list = [5, 1, 3, 2, 9] - some_pairs = [('crouton', 17), ('supreme', None)] - pipeline = TestPipeline() - main_input = pipeline | 'main input' >> Create([1]) - side_list = pipeline | 'side list' >> Create(a_list) - side_pairs = pipeline | 'side pairs' >> Create(some_pairs) - results = main_input | FlatMap( - 'concatenate', - lambda x, the_list, the_dict: [[x, the_list, the_dict]], - AsList(side_list), AsDict(side_pairs)) - - def matcher(expected_elem, expected_list, expected_pairs): - def match(actual): - [[actual_elem, actual_list, actual_dict]] = actual - equal_to([expected_elem])([actual_elem]) - equal_to(expected_list)(actual_list) - equal_to(expected_pairs)(actual_dict.iteritems()) - return match - - assert_that(results, matcher(1, a_list, some_pairs)) - pipeline.run() - - @attr('ValidatesRunner') - def test_as_singleton_without_unique_labels(self): - # This should succeed as calling AsSingleton on the same PCollection twice - # with the same defaults will return the same PCollectionView. - a_list = [2] - pipeline = TestPipeline() - main_input = pipeline | 'main input' >> Create([1]) - side_list = pipeline | 'side list' >> Create(a_list) - results = main_input | FlatMap( - 'test', - lambda x, s1, s2: [[x, s1, s2]], - AsSingleton(side_list), AsSingleton(side_list)) - - def matcher(expected_elem, expected_singleton): - def match(actual): - [[actual_elem, actual_singleton1, actual_singleton2]] = actual - equal_to([expected_elem])([actual_elem]) - equal_to([expected_singleton])([actual_singleton1]) - equal_to([expected_singleton])([actual_singleton2]) - return match - - assert_that(results, matcher(1, 2)) - pipeline.run() - - @attr('ValidatesRunner') - def test_as_singleton_with_different_defaults_without_unique_labels(self): - # This should fail as AsSingleton with distinct default values should create - # distinct PCollectionViews with the same full_label. - a_list = [2] - pipeline = TestPipeline() - main_input = pipeline | 'main input' >> Create([1]) - side_list = pipeline | 'side list' >> Create(a_list) - - with self.assertRaises(RuntimeError) as e: - _ = main_input | FlatMap( - 'test', - lambda x, s1, s2: [[x, s1, s2]], - AsSingleton(side_list), AsSingleton(side_list, default_value=3)) - self.assertTrue( - e.exception.message.startswith( - 'Transform "ViewAsSingleton(side list.None)" does not have a ' - 'stable unique label.')) - - @attr('ValidatesRunner') - def test_as_singleton_with_different_defaults_with_unique_labels(self): - a_list = [] - pipeline = TestPipeline() - main_input = pipeline | 'main input' >> Create([1]) - side_list = pipeline | 'side list' >> Create(a_list) - results = main_input | FlatMap( - 'test', - lambda x, s1, s2: [[x, s1, s2]], - AsSingleton('si1', side_list, default_value=2), - AsSingleton('si2', side_list, default_value=3)) - - def matcher(expected_elem, expected_singleton1, expected_singleton2): - def match(actual): - [[actual_elem, actual_singleton1, actual_singleton2]] = actual - equal_to([expected_elem])([actual_elem]) - equal_to([expected_singleton1])([actual_singleton1]) - equal_to([expected_singleton2])([actual_singleton2]) - return match - - assert_that(results, matcher(1, 2, 3)) - pipeline.run() - - @attr('ValidatesRunner') - def test_as_list_without_unique_labels(self): - # This should succeed as calling AsList on the same PCollection twice will - # return the same PCollectionView. - a_list = [1, 2, 3] - pipeline = TestPipeline() - main_input = pipeline | 'main input' >> Create([1]) - side_list = pipeline | 'side list' >> Create(a_list) - results = main_input | FlatMap( - 'test', - lambda x, ls1, ls2: [[x, ls1, ls2]], - AsList(side_list), AsList(side_list)) - - def matcher(expected_elem, expected_list): - def match(actual): - [[actual_elem, actual_list1, actual_list2]] = actual - equal_to([expected_elem])([actual_elem]) - equal_to(expected_list)(actual_list1) - equal_to(expected_list)(actual_list2) - return match - - assert_that(results, matcher(1, [1, 2, 3])) - pipeline.run() - - @attr('ValidatesRunner') - def test_as_list_with_unique_labels(self): - a_list = [1, 2, 3] - pipeline = TestPipeline() - main_input = pipeline | 'main input' >> Create([1]) - side_list = pipeline | 'side list' >> Create(a_list) - results = main_input | FlatMap( - 'test', - lambda x, ls1, ls2: [[x, ls1, ls2]], - AsList(side_list), AsList(side_list, label='label')) - - def matcher(expected_elem, expected_list): - def match(actual): - [[actual_elem, actual_list1, actual_list2]] = actual - equal_to([expected_elem])([actual_elem]) - equal_to(expected_list)(actual_list1) - equal_to(expected_list)(actual_list2) - return match - - assert_that(results, matcher(1, [1, 2, 3])) - pipeline.run() - - @attr('ValidatesRunner') - def test_as_dict_with_unique_labels(self): - some_kvs = [('a', 1), ('b', 2)] - pipeline = TestPipeline() - main_input = pipeline | 'main input' >> Create([1]) - side_kvs = pipeline | 'side kvs' >> Create(some_kvs) - results = main_input | FlatMap( - 'test', - lambda x, dct1, dct2: [[x, dct1, dct2]], - AsDict(side_kvs), AsDict(side_kvs, label='label')) - - def matcher(expected_elem, expected_kvs): - def match(actual): - [[actual_elem, actual_dict1, actual_dict2]] = actual - equal_to([expected_elem])([actual_elem]) - equal_to(expected_kvs)(actual_dict1.iteritems()) - equal_to(expected_kvs)(actual_dict2.iteritems()) - return match - - assert_that(results, matcher(1, some_kvs)) - pipeline.run() - - @attr('ValidatesRunner') - def test_window_transform(self): - class TestWindowFn(WindowFn): - """Windowing function adding two disjoint windows to each element.""" - - def assign(self, assign_context): - _ = assign_context - return [IntervalWindow(10, 20), IntervalWindow(20, 30)] - - def merge(self, existing_windows): - return existing_windows - - pipeline = TestPipeline() - numbers = pipeline | 'KVs' >> Create([(1, 10), (2, 20), (3, 30)]) - result = (numbers - | 'W' >> WindowInto(windowfn=TestWindowFn()) - | 'G' >> GroupByKey()) - assert_that( - result, equal_to([(1, [10]), (1, [10]), (2, [20]), - (2, [20]), (3, [30]), (3, [30])])) - pipeline.run() - - -if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/d5b90d83/sdks/python/apache_beam/transforms/ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 827bc83..68e4482 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -24,6 +24,7 @@ import re import unittest import hamcrest as hc +from nose.plugins.attrib import attr import apache_beam as beam from apache_beam.test_pipeline import TestPipeline @@ -189,6 +190,72 @@ class PTransformTest(unittest.TestCase): assert_that(r2.m, equal_to([3, 4, 5]), label='r2') pipeline.run() + @attr('ValidatesRunner') + def test_par_do_with_multiple_outputs_and_using_yield(self): + class SomeDoFn(beam.DoFn): + """A custom DoFn using yield.""" + + def process(self, context): + yield context.element + if context.element % 2 == 0: + yield pvalue.SideOutputValue('even', context.element) + else: + yield pvalue.SideOutputValue('odd', context.element) + + pipeline = TestPipeline() + nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4]) + results = nums | beam.ParDo( + 'ClassifyNumbers', SomeDoFn()).with_outputs('odd', 'even', main='main') + assert_that(results.main, equal_to([1, 2, 3, 4])) + assert_that(results.odd, equal_to([1, 3]), label='assert:odd') + assert_that(results.even, equal_to([2, 4]), label='assert:even') + pipeline.run() + + @attr('ValidatesRunner') + def test_par_do_with_multiple_outputs_and_using_return(self): + def some_fn(v): + if v % 2 == 0: + return [v, pvalue.SideOutputValue('even', v)] + else: + return [v, pvalue.SideOutputValue('odd', v)] + + pipeline = TestPipeline() + nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4]) + results = nums | beam.FlatMap( + 'ClassifyNumbers', some_fn).with_outputs('odd', 'even', main='main') + assert_that(results.main, equal_to([1, 2, 3, 4])) + assert_that(results.odd, equal_to([1, 3]), label='assert:odd') + assert_that(results.even, equal_to([2, 4]), label='assert:even') + pipeline.run() + + @attr('ValidatesRunner') + def test_undeclared_side_outputs(self): + pipeline = TestPipeline() + nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4]) + results = nums | beam.FlatMap( + 'ClassifyNumbers', + lambda x: [x, + pvalue.SideOutputValue('even' if x % 2 == 0 else 'odd', x)] + ).with_outputs() + assert_that(results[None], equal_to([1, 2, 3, 4])) + assert_that(results.odd, equal_to([1, 3]), label='assert:odd') + assert_that(results.even, equal_to([2, 4]), label='assert:even') + pipeline.run() + + @attr('ValidatesRunner') + def test_empty_side_outputs(self): + pipeline = TestPipeline() + nums = pipeline | 'Some Numbers' >> beam.Create([1, 3, 5]) + results = nums | beam.FlatMap( + 'ClassifyNumbers', + lambda x: [x, + pvalue.SideOutputValue('even' if x % 2 == 0 else 'odd', x)] + ).with_outputs() + assert_that(results[None], equal_to([1, 3, 5])) + assert_that(results.odd, equal_to([1, 3, 5]), label='assert:odd') + assert_that(results.even, equal_to([]), label='assert:even') + pipeline.run() + def test_do_requires_do_fn_returning_iterable(self): # This function is incorrect because it returns an object that isn't an # iterable. http://git-wip-us.apache.org/repos/asf/beam/blob/d5b90d83/sdks/python/apache_beam/transforms/sideinputs_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py index a327dc8..9384e7b 100644 --- a/sdks/python/apache_beam/transforms/sideinputs_test.py +++ b/sdks/python/apache_beam/transforms/sideinputs_test.py @@ -20,7 +20,10 @@ import logging import unittest +from nose.plugins.attrib import attr + import apache_beam as beam +from apache_beam.test_pipeline import TestPipeline from apache_beam.transforms import window from apache_beam.transforms.util import assert_that, equal_to @@ -28,7 +31,7 @@ from apache_beam.transforms.util import assert_that, equal_to class SideInputsTest(unittest.TestCase): def create_pipeline(self): - return beam.Pipeline('DirectRunner') + return TestPipeline('DirectRunner') def run_windowed_side_inputs(self, elements, main_window_fn, side_window_fn=None, @@ -125,6 +128,209 @@ class SideInputsTest(unittest.TestCase): (11, {'k11': 'v11'}), ]) + @attr('ValidatesRunner') + def test_empty_singleton_side_input(self): + pipeline = self.create_pipeline() + pcol = pipeline | 'start' >> beam.Create([1, 2]) + side = pipeline | 'side' >> beam.Create([]) # Empty side input. + + def my_fn(k, s): + # TODO(robertwb): Should this be an error as in Java? + v = ('empty' if isinstance(s, beam.pvalue.EmptySideInput) else 'full') + return [(k, v)] + result = pcol | 'compute' >> beam.FlatMap( + my_fn, beam.pvalue.AsSingleton(side)) + assert_that(result, equal_to([(1, 'empty'), (2, 'empty')])) + pipeline.run() + + # @attr('ValidatesRunner') + # TODO(BEAM-1124): Temporarily disable it due to test failed running on + # Dataflow service. + def test_multi_valued_singleton_side_input(self): + pipeline = self.create_pipeline() + pcol = pipeline | 'start' >> beam.Create([1, 2]) + side = pipeline | 'side' >> beam.Create([3, 4]) # 2 values in side input. + pcol | 'compute' >> beam.FlatMap( # pylint: disable=expression-not-assigned + lambda x, s: [x * s], beam.pvalue.AsSingleton(side)) + with self.assertRaises(ValueError): + pipeline.run() + + @attr('ValidatesRunner') + def test_default_value_singleton_side_input(self): + pipeline = self.create_pipeline() + pcol = pipeline | 'start' >> beam.Create([1, 2]) + side = pipeline | 'side' >> beam.Create([]) # 0 values in side input. + result = pcol | beam.FlatMap( + lambda x, s: [x * s], beam.pvalue.AsSingleton(side, 10)) + assert_that(result, equal_to([10, 20])) + pipeline.run() + + @attr('ValidatesRunner') + def test_iterable_side_input(self): + pipeline = self.create_pipeline() + pcol = pipeline | 'start' >> beam.Create([1, 2]) + side = pipeline | 'side' >> beam.Create([3, 4]) # 2 values in side input. + result = pcol | 'compute' >> beam.FlatMap( + lambda x, s: [x * y for y in s], + beam.pvalue.AsIter(side)) + assert_that(result, equal_to([3, 4, 6, 8])) + pipeline.run() + + @attr('ValidatesRunner') + def test_as_list_and_as_dict_side_inputs(self): + a_list = [5, 1, 3, 2, 9] + some_pairs = [('crouton', 17), ('supreme', None)] + pipeline = self.create_pipeline() + main_input = pipeline | 'main input' >> beam.Create([1]) + side_list = pipeline | 'side list' >> beam.Create(a_list) + side_pairs = pipeline | 'side pairs' >> beam.Create(some_pairs) + results = main_input | 'concatenate' >> beam.FlatMap( + lambda x, the_list, the_dict: [[x, the_list, the_dict]], + beam.pvalue.AsList(side_list), beam.pvalue.AsDict(side_pairs)) + + def matcher(expected_elem, expected_list, expected_pairs): + def match(actual): + [[actual_elem, actual_list, actual_dict]] = actual + equal_to([expected_elem])([actual_elem]) + equal_to(expected_list)(actual_list) + equal_to(expected_pairs)(actual_dict.iteritems()) + return match + + assert_that(results, matcher(1, a_list, some_pairs)) + pipeline.run() + + @attr('ValidatesRunner') + def test_as_singleton_without_unique_labels(self): + # This should succeed as calling beam.pvalue.AsSingleton on the same + # PCollection twice with the same defaults will return the same + # PCollectionView. + a_list = [2] + pipeline = self.create_pipeline() + main_input = pipeline | 'main input' >> beam.Create([1]) + side_list = pipeline | 'side list' >> beam.Create(a_list) + results = main_input | beam.FlatMap( + lambda x, s1, s2: [[x, s1, s2]], + beam.pvalue.AsSingleton(side_list), beam.pvalue.AsSingleton(side_list)) + + def matcher(expected_elem, expected_singleton): + def match(actual): + [[actual_elem, actual_singleton1, actual_singleton2]] = actual + equal_to([expected_elem])([actual_elem]) + equal_to([expected_singleton])([actual_singleton1]) + equal_to([expected_singleton])([actual_singleton2]) + return match + + assert_that(results, matcher(1, 2)) + pipeline.run() + + @attr('ValidatesRunner') + def test_as_singleton_with_different_defaults_without_unique_labels(self): + # This should fail as beam.pvalue.AsSingleton with distinct default values + # should beam.Create distinct PCollectionViews with the same full_label. + a_list = [2] + pipeline = self.create_pipeline() + main_input = pipeline | 'main input' >> beam.Create([1]) + side_list = pipeline | 'side list' >> beam.Create(a_list) + + with self.assertRaises(RuntimeError) as e: + _ = main_input | beam.FlatMap( + lambda x, s1, s2: [[x, s1, s2]], + beam.pvalue.AsSingleton(side_list), + beam.pvalue.AsSingleton(side_list, default_value=3)) + self.assertTrue( + e.exception.message.startswith( + 'Transform "ViewAsSingleton(side list.None)" does not have a ' + 'stable unique label.')) + + @attr('ValidatesRunner') + def test_as_singleton_with_different_defaults_with_unique_labels(self): + a_list = [] + pipeline = self.create_pipeline() + main_input = pipeline | 'main input' >> beam.Create([1]) + side_list = pipeline | 'side list' >> beam.Create(a_list) + results = main_input | beam.FlatMap( + lambda x, s1, s2: [[x, s1, s2]], + beam.pvalue.AsSingleton('si1', side_list, default_value=2), + beam.pvalue.AsSingleton('si2', side_list, default_value=3)) + + def matcher(expected_elem, expected_singleton1, expected_singleton2): + def match(actual): + [[actual_elem, actual_singleton1, actual_singleton2]] = actual + equal_to([expected_elem])([actual_elem]) + equal_to([expected_singleton1])([actual_singleton1]) + equal_to([expected_singleton2])([actual_singleton2]) + return match + + assert_that(results, matcher(1, 2, 3)) + pipeline.run() + + @attr('ValidatesRunner') + def test_as_list_without_unique_labels(self): + # This should succeed as calling beam.pvalue.AsList on the same + # PCollection twice will return the same PCollectionView. + a_list = [1, 2, 3] + pipeline = self.create_pipeline() + main_input = pipeline | 'main input' >> beam.Create([1]) + side_list = pipeline | 'side list' >> beam.Create(a_list) + results = main_input | beam.FlatMap( + lambda x, ls1, ls2: [[x, ls1, ls2]], + beam.pvalue.AsList(side_list), beam.pvalue.AsList(side_list)) + + def matcher(expected_elem, expected_list): + def match(actual): + [[actual_elem, actual_list1, actual_list2]] = actual + equal_to([expected_elem])([actual_elem]) + equal_to(expected_list)(actual_list1) + equal_to(expected_list)(actual_list2) + return match + + assert_that(results, matcher(1, [1, 2, 3])) + pipeline.run() + + @attr('ValidatesRunner') + def test_as_list_with_unique_labels(self): + a_list = [1, 2, 3] + pipeline = self.create_pipeline() + main_input = pipeline | 'main input' >> beam.Create([1]) + side_list = pipeline | 'side list' >> beam.Create(a_list) + results = main_input | beam.FlatMap( + lambda x, ls1, ls2: [[x, ls1, ls2]], + beam.pvalue.AsList(side_list), + beam.pvalue.AsList(side_list, label='label')) + + def matcher(expected_elem, expected_list): + def match(actual): + [[actual_elem, actual_list1, actual_list2]] = actual + equal_to([expected_elem])([actual_elem]) + equal_to(expected_list)(actual_list1) + equal_to(expected_list)(actual_list2) + return match + + assert_that(results, matcher(1, [1, 2, 3])) + pipeline.run() + + @attr('ValidatesRunner') + def test_as_dict_with_unique_labels(self): + some_kvs = [('a', 1), ('b', 2)] + pipeline = self.create_pipeline() + main_input = pipeline | 'main input' >> beam.Create([1]) + side_kvs = pipeline | 'side kvs' >> beam.Create(some_kvs) + results = main_input | beam.FlatMap( + lambda x, dct1, dct2: [[x, dct1, dct2]], + beam.pvalue.AsDict(side_kvs), + beam.pvalue.AsDict(side_kvs, label='label')) + + def matcher(expected_elem, expected_kvs): + def match(actual): + [[actual_elem, actual_dict1, actual_dict2]] = actual + equal_to([expected_elem])([actual_elem]) + equal_to(expected_kvs)(actual_dict1.iteritems()) + equal_to(expected_kvs)(actual_dict2.iteritems()) + return match + + assert_that(results, matcher(1, some_kvs)) + pipeline.run() + if __name__ == '__main__': logging.getLogger().setLevel(logging.DEBUG)