This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch users/damccorm/mo_bounded
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 89d0f9af35e09d62ced7d22b695e9d7534e0a66a
Author: Danny McCormick <[email protected]>
AuthorDate: Tue Nov 21 08:27:02 2023 -0500

    Add failing test
---
 sdks/python/apache_beam/transforms/core_test.py | 27 +++++++++++++++++++++++++
 1 file changed, 27 insertions(+)

diff --git a/sdks/python/apache_beam/transforms/core_test.py 
b/sdks/python/apache_beam/transforms/core_test.py
index 0fba2826613..4f5716d7475 100644
--- a/sdks/python/apache_beam/transforms/core_test.py
+++ b/sdks/python/apache_beam/transforms/core_test.py
@@ -108,6 +108,33 @@ class CreateTest(unittest.TestCase):
       assert warning_text in self._caplog.text
 
 
+class PartitionTest(unittest.TestCase):
+  def test_partition_boundedness(self):
+    def partition_fn(val, num_partitions):
+      return val % num_partitions
+
+    class UnboundedDoFn(beam.DoFn):
+      @beam.DoFn.unbounded_per_element()
+      def process(self, element):
+        yield element
+
+    with beam.testing.test_pipeline.TestPipeline() as p:
+      source = p | beam.Create([1,2,3,4,5])
+      p1, p2, p3 = source | "bounded" >> beam.Partition(partition_fn, 3)
+
+      self.assertEqual(source.is_bounded, True)
+      self.assertEqual(p1.is_bounded, True)
+      self.assertEqual(p2.is_bounded, True)
+      self.assertEqual(p3.is_bounded, True)
+
+      unbounded_pcoll = source | beam.ParDo(UnboundedDoFn())
+      p4, p5, p6 = unbounded_pcoll | "unbounded" >> 
beam.Partition(partition_fn, 3)
+
+      self.assertEqual(unbounded_pcoll.is_bounded, False)
+      self.assertEqual(p4.is_bounded, False)
+      self.assertEqual(p5.is_bounded, False)
+      self.assertEqual(p6.is_bounded, False)
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()

Reply via email to