This is an automated email from the ASF dual-hosted git repository. damccorm pushed a commit to branch users/damccorm/mo_bounded in repository https://gitbox.apache.org/repos/asf/beam.git
commit 89d0f9af35e09d62ced7d22b695e9d7534e0a66a Author: Danny McCormick <[email protected]> AuthorDate: Tue Nov 21 08:27:02 2023 -0500 Add failing test --- sdks/python/apache_beam/transforms/core_test.py | 27 +++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/sdks/python/apache_beam/transforms/core_test.py b/sdks/python/apache_beam/transforms/core_test.py index 0fba2826613..4f5716d7475 100644 --- a/sdks/python/apache_beam/transforms/core_test.py +++ b/sdks/python/apache_beam/transforms/core_test.py @@ -108,6 +108,33 @@ class CreateTest(unittest.TestCase): assert warning_text in self._caplog.text +class PartitionTest(unittest.TestCase): + def test_partition_boundedness(self): + def partition_fn(val, num_partitions): + return val % num_partitions + + class UnboundedDoFn(beam.DoFn): + @beam.DoFn.unbounded_per_element() + def process(self, element): + yield element + + with beam.testing.test_pipeline.TestPipeline() as p: + source = p | beam.Create([1,2,3,4,5]) + p1, p2, p3 = source | "bounded" >> beam.Partition(partition_fn, 3) + + self.assertEqual(source.is_bounded, True) + self.assertEqual(p1.is_bounded, True) + self.assertEqual(p2.is_bounded, True) + self.assertEqual(p3.is_bounded, True) + + unbounded_pcoll = source | beam.ParDo(UnboundedDoFn()) + p4, p5, p6 = unbounded_pcoll | "unbounded" >> beam.Partition(partition_fn, 3) + + self.assertEqual(unbounded_pcoll.is_bounded, False) + self.assertEqual(p4.is_bounded, False) + self.assertEqual(p5.is_bounded, False) + self.assertEqual(p6.is_bounded, False) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main()
