Repository: beam
Updated Branches:
  refs/heads/python-sdk af49908b8 -> d0dc1f375

Revert "Remove"

This reverts commit d5b90d8383e662e803ea79b31661250a043bcfd2.


Branch: refs/heads/python-sdk
Commit: 96fcc7d31c2540f867c3a73903c2aa99183a6b8b
Parents: af49908
Author: Robert Bradshaw <>
Authored: Tue Jan 24 09:28:38 2017 -0800
Committer: Robert Bradshaw <>
Committed: Tue Jan 24 09:28:38 2017 -0800

 sdks/python/apache_beam/        | 418 +++++++++++++++++++
 .../apache_beam/transforms/   |  67 ---
 .../apache_beam/transforms/   | 208 +--------
 3 files changed, 419 insertions(+), 274 deletions(-)
diff --git a/sdks/python/apache_beam/ 
new file mode 100644
index 0000000..f410230
--- /dev/null
+++ b/sdks/python/apache_beam/
@@ -0,0 +1,418 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Integration tests for the dataflow package."""
+from __future__ import absolute_import
+import logging
+import re
+import unittest
+import apache_beam as beam
+from apache_beam.pvalue import AsDict
+from apache_beam.pvalue import AsIter as AllOf
+from apache_beam.pvalue import AsList
+from apache_beam.pvalue import AsSingleton
+from apache_beam.pvalue import EmptySideInput
+from apache_beam.pvalue import SideOutputValue
+from apache_beam.test_pipeline import TestPipeline
+from apache_beam.transforms import Create
+from apache_beam.transforms import DoFn
+from apache_beam.transforms import FlatMap
+from apache_beam.transforms import GroupByKey
+from apache_beam.transforms import Map
+from apache_beam.transforms import ParDo
+from apache_beam.transforms import WindowInto
+from apache_beam.transforms.util import assert_that
+from apache_beam.transforms.util import equal_to
+from apache_beam.transforms.window import IntervalWindow
+from apache_beam.transforms.window import WindowFn
+from nose.plugins.attrib import attr
+class DataflowTest(unittest.TestCase):
+  """Dataflow integration tests."""
+  SAMPLE_DATA = ['aa bb cc aa bb aa \n'] * 10
+  SAMPLE_RESULT = [('cc', 10), ('bb', 20), ('aa', 30)]
+  @beam.ptransform_fn
+  def Count(pcoll):  # pylint: disable=invalid-name, no-self-argument
+    """A Count transform: v, ... => (v, n), ..."""
+    return (pcoll
+            | 'AddCount' >> Map(lambda x: (x, 1))
+            | 'GroupCounts' >> GroupByKey()
+            | 'AddCounts' >> Map(lambda (x, ones): (x, sum(ones))))
+  @attr('ValidatesRunner')
+  def test_word_count(self):
+    pipeline = TestPipeline()
+    lines = pipeline | 'SomeWords' >> Create(DataflowTest.SAMPLE_DATA)
+    result = (
+        (lines | 'GetWords' >> FlatMap(lambda x: re.findall(r'\w+', x)))
+        .apply('CountWords', DataflowTest.Count))
+    assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT))
+  @attr('ValidatesRunner')
+  def test_map(self):
+    pipeline = TestPipeline()
+    lines = pipeline | 'input' >> Create(['a', 'b', 'c'])
+    result = (lines
+              | 'upper' >> Map(str.upper)
+              | 'prefix' >> Map(lambda x, prefix: prefix + x, 'foo-'))
+    assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C']))
+  @attr('ValidatesRunner')
+  def test_par_do_with_side_input_as_arg(self):
+    pipeline = TestPipeline()
+    words_list = ['aa', 'bb', 'cc']
+    words = pipeline | 'SomeWords' >> Create(words_list)
+    prefix = pipeline | 'SomeString' >> Create(['xyz'])  # side in
+    suffix = 'zyx'
+    result = words | FlatMap(
+        'DecorateWords',
+        lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)],
+        AsSingleton(prefix), suffix)
+    assert_that(result, equal_to(['xyz-%s-zyx' % x for x in words_list]))
+  @attr('ValidatesRunner')
+  def test_par_do_with_side_input_as_keyword_arg(self):
+    pipeline = TestPipeline()
+    words_list = ['aa', 'bb', 'cc']
+    words = pipeline | 'SomeWords' >> Create(words_list)
+    prefix = 'zyx'
+    suffix = pipeline | 'SomeString' >> Create(['xyz'])  # side in
+    result = words | FlatMap(
+        'DecorateWords',
+        lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)],
+        prefix, sfx=AsSingleton(suffix))
+    assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
+  @attr('ValidatesRunner')
+  def test_par_do_with_do_fn_object(self):
+    class SomeDoFn(DoFn):
+      """A custom DoFn for a FlatMap transform."""
+      def process(self, context, prefix, suffix):
+        return ['%s-%s-%s' % (prefix, context.element, suffix)]
+    pipeline = TestPipeline()
+    words_list = ['aa', 'bb', 'cc']
+    words = pipeline | 'SomeWords' >> Create(words_list)
+    prefix = 'zyx'
+    suffix = pipeline | 'SomeString' >> Create(['xyz'])  # side in
+    result = words | 'DecorateWordsDoFn' >> ParDo(
+        SomeDoFn(), prefix, suffix=AsSingleton(suffix))
+    assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
+  @attr('ValidatesRunner')
+  def test_par_do_with_multiple_outputs_and_using_yield(self):
+    class SomeDoFn(DoFn):
+      """A custom DoFn using yield."""
+      def process(self, context):
+        yield context.element
+        if context.element % 2 == 0:
+          yield SideOutputValue('even', context.element)
+        else:
+          yield SideOutputValue('odd', context.element)
+    pipeline = TestPipeline()
+    nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
+    results = nums | ParDo(
+        'ClassifyNumbers', SomeDoFn()).with_outputs('odd', 'even', main='main')
+    assert_that(results.main, equal_to([1, 2, 3, 4]))
+    assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
+    assert_that(results.even, equal_to([2, 4]), label='assert:even')
+  @attr('ValidatesRunner')
+  def test_par_do_with_multiple_outputs_and_using_return(self):
+    def some_fn(v):
+      if v % 2 == 0:
+        return [v, SideOutputValue('even', v)]
+      else:
+        return [v, SideOutputValue('odd', v)]
+    pipeline = TestPipeline()
+    nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
+    results = nums | FlatMap(
+        'ClassifyNumbers', some_fn).with_outputs('odd', 'even', main='main')
+    assert_that(results.main, equal_to([1, 2, 3, 4]))
+    assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
+    assert_that(results.even, equal_to([2, 4]), label='assert:even')
+  @attr('ValidatesRunner')
+  def test_empty_singleton_side_input(self):
+    pipeline = TestPipeline()
+    pcol = pipeline | 'start' >> Create([1, 2])
+    side = pipeline | 'side' >> Create([])  # Empty side input.
+    def my_fn(k, s):
+      v = ('empty' if isinstance(s, EmptySideInput) else 'full')
+      return [(k, v)]
+    result = pcol | 'compute' >> FlatMap(my_fn, AsSingleton(side))
+    assert_that(result, equal_to([(1, 'empty'), (2, 'empty')]))
+  # @attr('ValidatesRunner')
+  # TODO(BEAM-1124): Temporarily disable it due to test failed running on
+  # Dataflow service.
+  def test_multi_valued_singleton_side_input(self):
+    pipeline = TestPipeline()
+    pcol = pipeline | 'start' >> Create([1, 2])
+    side = pipeline | 'side' >> Create([3, 4])  # 2 values in side input.
+    pcol | 'compute' >> FlatMap(lambda x, s: [x * s], AsSingleton(side))  # 
pylint: disable=expression-not-assigned
+    with self.assertRaises(ValueError):
+  @attr('ValidatesRunner')
+  def test_default_value_singleton_side_input(self):
+    pipeline = TestPipeline()
+    pcol = pipeline | 'start' >> Create([1, 2])
+    side = pipeline | 'side' >> Create([])  # 0 values in side input.
+    result = pcol | FlatMap(lambda x, s: [x * s], AsSingleton(side, 10))
+    assert_that(result, equal_to([10, 20]))
+  @attr('ValidatesRunner')
+  def test_iterable_side_input(self):
+    pipeline = TestPipeline()
+    pcol = pipeline | 'start' >> Create([1, 2])
+    side = pipeline | 'side' >> Create([3, 4])  # 2 values in side input.
+    result = pcol | FlatMap('compute',
+                            lambda x, s: [x * y for y in s], AllOf(side))
+    assert_that(result, equal_to([3, 4, 6, 8]))
+  @attr('ValidatesRunner')
+  def test_undeclared_side_outputs(self):
+    pipeline = TestPipeline()
+    nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
+    results = nums | FlatMap(
+        'ClassifyNumbers',
+        lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
+    ).with_outputs()
+    assert_that(results[None], equal_to([1, 2, 3, 4]))
+    assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
+    assert_that(results.even, equal_to([2, 4]), label='assert:even')
+  @attr('ValidatesRunner')
+  def test_empty_side_outputs(self):
+    pipeline = TestPipeline()
+    nums = pipeline | 'Some Numbers' >> Create([1, 3, 5])
+    results = nums | FlatMap(
+        'ClassifyNumbers',
+        lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
+    ).with_outputs()
+    assert_that(results[None], equal_to([1, 3, 5]))
+    assert_that(results.odd, equal_to([1, 3, 5]), label='assert:odd')
+    assert_that(results.even, equal_to([]), label='assert:even')
+  @attr('ValidatesRunner')
+  def test_as_list_and_as_dict_side_inputs(self):
+    a_list = [5, 1, 3, 2, 9]
+    some_pairs = [('crouton', 17), ('supreme', None)]
+    pipeline = TestPipeline()
+    main_input = pipeline | 'main input' >> Create([1])
+    side_list = pipeline | 'side list' >> Create(a_list)
+    side_pairs = pipeline | 'side pairs' >> Create(some_pairs)
+    results = main_input | FlatMap(
+        'concatenate',
+        lambda x, the_list, the_dict: [[x, the_list, the_dict]],
+        AsList(side_list), AsDict(side_pairs))
+    def  matcher(expected_elem, expected_list, expected_pairs):
+      def match(actual):
+        [[actual_elem, actual_list, actual_dict]] = actual
+        equal_to([expected_elem])([actual_elem])
+        equal_to(expected_list)(actual_list)
+        equal_to(expected_pairs)(actual_dict.iteritems())
+      return match
+    assert_that(results, matcher(1, a_list, some_pairs))
+  @attr('ValidatesRunner')
+  def test_as_singleton_without_unique_labels(self):
+    # This should succeed as calling AsSingleton on the same PCollection twice
+    # with the same defaults will return the same PCollectionView.
+    a_list = [2]
+    pipeline = TestPipeline()
+    main_input = pipeline | 'main input' >> Create([1])
+    side_list = pipeline | 'side list' >> Create(a_list)
+    results = main_input | FlatMap(
+        'test',
+        lambda x, s1, s2: [[x, s1, s2]],
+        AsSingleton(side_list), AsSingleton(side_list))
+    def  matcher(expected_elem, expected_singleton):
+      def match(actual):
+        [[actual_elem, actual_singleton1, actual_singleton2]] = actual
+        equal_to([expected_elem])([actual_elem])
+        equal_to([expected_singleton])([actual_singleton1])
+        equal_to([expected_singleton])([actual_singleton2])
+      return match
+    assert_that(results, matcher(1, 2))
+  @attr('ValidatesRunner')
+  def test_as_singleton_with_different_defaults_without_unique_labels(self):
+    # This should fail as AsSingleton with distinct default values should 
+    # distinct PCollectionViews with the same full_label.
+    a_list = [2]
+    pipeline = TestPipeline()
+    main_input = pipeline | 'main input' >> Create([1])
+    side_list = pipeline | 'side list' >> Create(a_list)
+    with self.assertRaises(RuntimeError) as e:
+      _ = main_input | FlatMap(
+          'test',
+          lambda x, s1, s2: [[x, s1, s2]],
+          AsSingleton(side_list), AsSingleton(side_list, default_value=3))
+    self.assertTrue(
+        e.exception.message.startswith(
+            'Transform "ViewAsSingleton(side list.None)" does not have a '
+            'stable unique label.'))
+  @attr('ValidatesRunner')
+  def test_as_singleton_with_different_defaults_with_unique_labels(self):
+    a_list = []
+    pipeline = TestPipeline()
+    main_input = pipeline | 'main input' >> Create([1])
+    side_list = pipeline | 'side list' >> Create(a_list)
+    results = main_input | FlatMap(
+        'test',
+        lambda x, s1, s2: [[x, s1, s2]],
+        AsSingleton('si1', side_list, default_value=2),
+        AsSingleton('si2', side_list, default_value=3))
+    def  matcher(expected_elem, expected_singleton1, expected_singleton2):
+      def match(actual):
+        [[actual_elem, actual_singleton1, actual_singleton2]] = actual
+        equal_to([expected_elem])([actual_elem])
+        equal_to([expected_singleton1])([actual_singleton1])
+        equal_to([expected_singleton2])([actual_singleton2])
+      return match
+    assert_that(results, matcher(1, 2, 3))
+  @attr('ValidatesRunner')
+  def test_as_list_without_unique_labels(self):
+    # This should succeed as calling AsList on the same PCollection twice will
+    # return the same PCollectionView.
+    a_list = [1, 2, 3]
+    pipeline = TestPipeline()
+    main_input = pipeline | 'main input' >> Create([1])
+    side_list = pipeline | 'side list' >> Create(a_list)
+    results = main_input | FlatMap(
+        'test',
+        lambda x, ls1, ls2: [[x, ls1, ls2]],
+        AsList(side_list), AsList(side_list))
+    def  matcher(expected_elem, expected_list):
+      def match(actual):
+        [[actual_elem, actual_list1, actual_list2]] = actual
+        equal_to([expected_elem])([actual_elem])
+        equal_to(expected_list)(actual_list1)
+        equal_to(expected_list)(actual_list2)
+      return match
+    assert_that(results, matcher(1, [1, 2, 3]))
+  @attr('ValidatesRunner')
+  def test_as_list_with_unique_labels(self):
+    a_list = [1, 2, 3]
+    pipeline = TestPipeline()
+    main_input = pipeline | 'main input' >> Create([1])
+    side_list = pipeline | 'side list' >> Create(a_list)
+    results = main_input | FlatMap(
+        'test',
+        lambda x, ls1, ls2: [[x, ls1, ls2]],
+        AsList(side_list), AsList(side_list, label='label'))
+    def  matcher(expected_elem, expected_list):
+      def match(actual):
+        [[actual_elem, actual_list1, actual_list2]] = actual
+        equal_to([expected_elem])([actual_elem])
+        equal_to(expected_list)(actual_list1)
+        equal_to(expected_list)(actual_list2)
+      return match
+    assert_that(results, matcher(1, [1, 2, 3]))
+  @attr('ValidatesRunner')
+  def test_as_dict_with_unique_labels(self):
+    some_kvs = [('a', 1), ('b', 2)]
+    pipeline = TestPipeline()
+    main_input = pipeline | 'main input' >> Create([1])
+    side_kvs = pipeline | 'side kvs' >> Create(some_kvs)
+    results = main_input | FlatMap(
+        'test',
+        lambda x, dct1, dct2: [[x, dct1, dct2]],
+        AsDict(side_kvs), AsDict(side_kvs, label='label'))
+    def  matcher(expected_elem, expected_kvs):
+      def match(actual):
+        [[actual_elem, actual_dict1, actual_dict2]] = actual
+        equal_to([expected_elem])([actual_elem])
+        equal_to(expected_kvs)(actual_dict1.iteritems())
+        equal_to(expected_kvs)(actual_dict2.iteritems())
+      return match
+    assert_that(results, matcher(1, some_kvs))
+  @attr('ValidatesRunner')
+  def test_window_transform(self):
+    class TestWindowFn(WindowFn):
+      """Windowing function adding two disjoint windows to each element."""
+      def assign(self, assign_context):
+        _ = assign_context
+        return [IntervalWindow(10, 20), IntervalWindow(20, 30)]
+      def merge(self, existing_windows):
+        return existing_windows
+    pipeline = TestPipeline()
+    numbers = pipeline | 'KVs' >> Create([(1, 10), (2, 20), (3, 30)])
+    result = (numbers
+              | 'W' >> WindowInto(windowfn=TestWindowFn())
+              | 'G' >> GroupByKey())
+    assert_that(
+        result, equal_to([(1, [10]), (1, [10]), (2, [20]),
+                          (2, [20]), (3, [30]), (3, [30])]))
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()
diff --git a/sdks/python/apache_beam/transforms/ 
index 68e4482..827bc83 100644
--- a/sdks/python/apache_beam/transforms/
+++ b/sdks/python/apache_beam/transforms/
@@ -24,7 +24,6 @@ import re
 import unittest
 import hamcrest as hc
-from nose.plugins.attrib import attr
 import apache_beam as beam
 from apache_beam.test_pipeline import TestPipeline
@@ -190,72 +189,6 @@ class PTransformTest(unittest.TestCase):
     assert_that(r2.m, equal_to([3, 4, 5]), label='r2')
-  @attr('ValidatesRunner')
-  def test_par_do_with_multiple_outputs_and_using_yield(self):
-    class SomeDoFn(beam.DoFn):
-      """A custom DoFn using yield."""
-      def process(self, context):
-        yield context.element
-        if context.element % 2 == 0:
-          yield pvalue.SideOutputValue('even', context.element)
-        else:
-          yield pvalue.SideOutputValue('odd', context.element)
-    pipeline = TestPipeline()
-    nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4])
-    results = nums | beam.ParDo(
-        'ClassifyNumbers', SomeDoFn()).with_outputs('odd', 'even', main='main')
-    assert_that(results.main, equal_to([1, 2, 3, 4]))
-    assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
-    assert_that(results.even, equal_to([2, 4]), label='assert:even')
-  @attr('ValidatesRunner')
-  def test_par_do_with_multiple_outputs_and_using_return(self):
-    def some_fn(v):
-      if v % 2 == 0:
-        return [v, pvalue.SideOutputValue('even', v)]
-      else:
-        return [v, pvalue.SideOutputValue('odd', v)]
-    pipeline = TestPipeline()
-    nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4])
-    results = nums | beam.FlatMap(
-        'ClassifyNumbers', some_fn).with_outputs('odd', 'even', main='main')
-    assert_that(results.main, equal_to([1, 2, 3, 4]))
-    assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
-    assert_that(results.even, equal_to([2, 4]), label='assert:even')
-  @attr('ValidatesRunner')
-  def test_undeclared_side_outputs(self):
-    pipeline = TestPipeline()
-    nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4])
-    results = nums | beam.FlatMap(
-        'ClassifyNumbers',
-        lambda x: [x,
-                   pvalue.SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
-    ).with_outputs()
-    assert_that(results[None], equal_to([1, 2, 3, 4]))
-    assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
-    assert_that(results.even, equal_to([2, 4]), label='assert:even')
-  @attr('ValidatesRunner')
-  def test_empty_side_outputs(self):
-    pipeline = TestPipeline()
-    nums = pipeline | 'Some Numbers' >> beam.Create([1, 3, 5])
-    results = nums | beam.FlatMap(
-        'ClassifyNumbers',
-        lambda x: [x,
-                   pvalue.SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
-    ).with_outputs()
-    assert_that(results[None], equal_to([1, 3, 5]))
-    assert_that(results.odd, equal_to([1, 3, 5]), label='assert:odd')
-    assert_that(results.even, equal_to([]), label='assert:even')
   def test_do_requires_do_fn_returning_iterable(self):
     # This function is incorrect because it returns an object that isn't an
     # iterable.
diff --git a/sdks/python/apache_beam/transforms/ 
index 9384e7b..a327dc8 100644
--- a/sdks/python/apache_beam/transforms/
+++ b/sdks/python/apache_beam/transforms/
@@ -20,10 +20,7 @@
 import logging
 import unittest
-from nose.plugins.attrib import attr
 import apache_beam as beam
-from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms import window
 from apache_beam.transforms.util import assert_that, equal_to
@@ -31,7 +28,7 @@ from apache_beam.transforms.util import assert_that, equal_to
 class SideInputsTest(unittest.TestCase):
   def create_pipeline(self):
-    return TestPipeline('DirectRunner')
+    return beam.Pipeline('DirectRunner')
   def run_windowed_side_inputs(self, elements, main_window_fn,
@@ -128,209 +125,6 @@ class SideInputsTest(unittest.TestCase):
             (11, {'k11': 'v11'}),
-  @attr('ValidatesRunner')
-  def test_empty_singleton_side_input(self):
-    pipeline = self.create_pipeline()
-    pcol = pipeline | 'start' >> beam.Create([1, 2])
-    side = pipeline | 'side' >> beam.Create([])  # Empty side input.
-    def my_fn(k, s):
-      # TODO(robertwb): Should this be an error as in Java?
-      v = ('empty' if isinstance(s, beam.pvalue.EmptySideInput) else 'full')
-      return [(k, v)]
-    result = pcol | 'compute' >> beam.FlatMap(
-        my_fn, beam.pvalue.AsSingleton(side))
-    assert_that(result, equal_to([(1, 'empty'), (2, 'empty')]))
-  # @attr('ValidatesRunner')
-  # TODO(BEAM-1124): Temporarily disable it due to test failed running on
-  # Dataflow service.
-  def test_multi_valued_singleton_side_input(self):
-    pipeline = self.create_pipeline()
-    pcol = pipeline | 'start' >> beam.Create([1, 2])
-    side = pipeline | 'side' >> beam.Create([3, 4])  # 2 values in side input.
-    pcol | 'compute' >> beam.FlatMap(  # pylint: 
-        lambda x, s: [x * s], beam.pvalue.AsSingleton(side))
-    with self.assertRaises(ValueError):
-  @attr('ValidatesRunner')
-  def test_default_value_singleton_side_input(self):
-    pipeline = self.create_pipeline()
-    pcol = pipeline | 'start' >> beam.Create([1, 2])
-    side = pipeline | 'side' >> beam.Create([])  # 0 values in side input.
-    result = pcol | beam.FlatMap(
-        lambda x, s: [x * s], beam.pvalue.AsSingleton(side, 10))
-    assert_that(result, equal_to([10, 20]))
-  @attr('ValidatesRunner')
-  def test_iterable_side_input(self):
-    pipeline = self.create_pipeline()
-    pcol = pipeline | 'start' >> beam.Create([1, 2])
-    side = pipeline | 'side' >> beam.Create([3, 4])  # 2 values in side input.
-    result = pcol | 'compute' >> beam.FlatMap(
-        lambda x, s: [x * y for y in s],
-        beam.pvalue.AsIter(side))
-    assert_that(result, equal_to([3, 4, 6, 8]))
-  @attr('ValidatesRunner')
-  def test_as_list_and_as_dict_side_inputs(self):
-    a_list = [5, 1, 3, 2, 9]
-    some_pairs = [('crouton', 17), ('supreme', None)]
-    pipeline = self.create_pipeline()
-    main_input = pipeline | 'main input' >> beam.Create([1])
-    side_list = pipeline | 'side list' >> beam.Create(a_list)
-    side_pairs = pipeline | 'side pairs' >> beam.Create(some_pairs)
-    results = main_input | 'concatenate' >> beam.FlatMap(
-        lambda x, the_list, the_dict: [[x, the_list, the_dict]],
-        beam.pvalue.AsList(side_list), beam.pvalue.AsDict(side_pairs))
-    def  matcher(expected_elem, expected_list, expected_pairs):
-      def match(actual):
-        [[actual_elem, actual_list, actual_dict]] = actual
-        equal_to([expected_elem])([actual_elem])
-        equal_to(expected_list)(actual_list)
-        equal_to(expected_pairs)(actual_dict.iteritems())
-      return match
-    assert_that(results, matcher(1, a_list, some_pairs))
-  @attr('ValidatesRunner')
-  def test_as_singleton_without_unique_labels(self):
-    # This should succeed as calling beam.pvalue.AsSingleton on the same
-    # PCollection twice with the same defaults will return the same
-    # PCollectionView.
-    a_list = [2]
-    pipeline = self.create_pipeline()
-    main_input = pipeline | 'main input' >> beam.Create([1])
-    side_list = pipeline | 'side list' >> beam.Create(a_list)
-    results = main_input | beam.FlatMap(
-        lambda x, s1, s2: [[x, s1, s2]],
-        beam.pvalue.AsSingleton(side_list), beam.pvalue.AsSingleton(side_list))
-    def  matcher(expected_elem, expected_singleton):
-      def match(actual):
-        [[actual_elem, actual_singleton1, actual_singleton2]] = actual
-        equal_to([expected_elem])([actual_elem])
-        equal_to([expected_singleton])([actual_singleton1])
-        equal_to([expected_singleton])([actual_singleton2])
-      return match
-    assert_that(results, matcher(1, 2))
-  @attr('ValidatesRunner')
-  def test_as_singleton_with_different_defaults_without_unique_labels(self):
-    # This should fail as beam.pvalue.AsSingleton with distinct default values
-    # should beam.Create distinct PCollectionViews with the same full_label.
-    a_list = [2]
-    pipeline = self.create_pipeline()
-    main_input = pipeline | 'main input' >> beam.Create([1])
-    side_list = pipeline | 'side list' >> beam.Create(a_list)
-    with self.assertRaises(RuntimeError) as e:
-      _ = main_input | beam.FlatMap(
-          lambda x, s1, s2: [[x, s1, s2]],
-          beam.pvalue.AsSingleton(side_list),
-          beam.pvalue.AsSingleton(side_list, default_value=3))
-    self.assertTrue(
-        e.exception.message.startswith(
-            'Transform "ViewAsSingleton(side list.None)" does not have a '
-            'stable unique label.'))
-  @attr('ValidatesRunner')
-  def test_as_singleton_with_different_defaults_with_unique_labels(self):
-    a_list = []
-    pipeline = self.create_pipeline()
-    main_input = pipeline | 'main input' >> beam.Create([1])
-    side_list = pipeline | 'side list' >> beam.Create(a_list)
-    results = main_input | beam.FlatMap(
-        lambda x, s1, s2: [[x, s1, s2]],
-        beam.pvalue.AsSingleton('si1', side_list, default_value=2),
-        beam.pvalue.AsSingleton('si2', side_list, default_value=3))
-    def  matcher(expected_elem, expected_singleton1, expected_singleton2):
-      def match(actual):
-        [[actual_elem, actual_singleton1, actual_singleton2]] = actual
-        equal_to([expected_elem])([actual_elem])
-        equal_to([expected_singleton1])([actual_singleton1])
-        equal_to([expected_singleton2])([actual_singleton2])
-      return match
-    assert_that(results, matcher(1, 2, 3))
-  @attr('ValidatesRunner')
-  def test_as_list_without_unique_labels(self):
-    # This should succeed as calling beam.pvalue.AsList on the same
-    # PCollection twice will return the same PCollectionView.
-    a_list = [1, 2, 3]
-    pipeline = self.create_pipeline()
-    main_input = pipeline | 'main input' >> beam.Create([1])
-    side_list = pipeline | 'side list' >> beam.Create(a_list)
-    results = main_input | beam.FlatMap(
-        lambda x, ls1, ls2: [[x, ls1, ls2]],
-        beam.pvalue.AsList(side_list), beam.pvalue.AsList(side_list))
-    def  matcher(expected_elem, expected_list):
-      def match(actual):
-        [[actual_elem, actual_list1, actual_list2]] = actual
-        equal_to([expected_elem])([actual_elem])
-        equal_to(expected_list)(actual_list1)
-        equal_to(expected_list)(actual_list2)
-      return match
-    assert_that(results, matcher(1, [1, 2, 3]))
-  @attr('ValidatesRunner')
-  def test_as_list_with_unique_labels(self):
-    a_list = [1, 2, 3]
-    pipeline = self.create_pipeline()
-    main_input = pipeline | 'main input' >> beam.Create([1])
-    side_list = pipeline | 'side list' >> beam.Create(a_list)
-    results = main_input | beam.FlatMap(
-        lambda x, ls1, ls2: [[x, ls1, ls2]],
-        beam.pvalue.AsList(side_list),
-        beam.pvalue.AsList(side_list, label='label'))
-    def  matcher(expected_elem, expected_list):
-      def match(actual):
-        [[actual_elem, actual_list1, actual_list2]] = actual
-        equal_to([expected_elem])([actual_elem])
-        equal_to(expected_list)(actual_list1)
-        equal_to(expected_list)(actual_list2)
-      return match
-    assert_that(results, matcher(1, [1, 2, 3]))
-  @attr('ValidatesRunner')
-  def test_as_dict_with_unique_labels(self):
-    some_kvs = [('a', 1), ('b', 2)]
-    pipeline = self.create_pipeline()
-    main_input = pipeline | 'main input' >> beam.Create([1])
-    side_kvs = pipeline | 'side kvs' >> beam.Create(some_kvs)
-    results = main_input | beam.FlatMap(
-        lambda x, dct1, dct2: [[x, dct1, dct2]],
-        beam.pvalue.AsDict(side_kvs),
-        beam.pvalue.AsDict(side_kvs, label='label'))
-    def  matcher(expected_elem, expected_kvs):
-      def match(actual):
-        [[actual_elem, actual_dict1, actual_dict2]] = actual
-        equal_to([expected_elem])([actual_elem])
-        equal_to(expected_kvs)(actual_dict1.iteritems())
-        equal_to(expected_kvs)(actual_dict2.iteritems())
-      return match
-    assert_that(results, matcher(1, some_kvs))
 if __name__ == '__main__':

Reply via email to