This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0972bc0170b Propagate correct boundedness when using multiple outputs 
(#29506)
0972bc0170b is described below

commit 0972bc0170b852ece0e8cccfec2f1d58dd438955
Author: Danny McCormick <[email protected]>
AuthorDate: Tue Nov 21 11:39:32 2023 -0500

    Propagate correct boundedness when using multiple outputs (#29506)
    
    * Add failing test
    
    * boundedness fix
---
 sdks/python/apache_beam/pvalue.py               |  7 ++++++-
 sdks/python/apache_beam/transforms/core_test.py | 28 +++++++++++++++++++++++++
 2 files changed, 34 insertions(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/pvalue.py 
b/sdks/python/apache_beam/pvalue.py
index 90882651d0b..b783d61f95c 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -304,7 +304,12 @@ class DoOutputsTuple(object):
     assert self.producer is not None
     if tag is not None:
       self._transform.output_tags.add(tag)
-      pcoll = PCollection(self._pipeline, tag=tag, element_type=typehints.Any)
+      is_bounded = all(i.is_bounded for i in 
self.producer.main_inputs.values())
+      pcoll = PCollection(
+          self._pipeline,
+          tag=tag,
+          element_type=typehints.Any,
+          is_bounded=is_bounded)
       # Transfer the producer from the DoOutputsTuple to the resulting
       # PCollection.
       pcoll.producer = self.producer.parts[0]
diff --git a/sdks/python/apache_beam/transforms/core_test.py 
b/sdks/python/apache_beam/transforms/core_test.py
index 0fba2826613..a60974ceb70 100644
--- a/sdks/python/apache_beam/transforms/core_test.py
+++ b/sdks/python/apache_beam/transforms/core_test.py
@@ -108,6 +108,34 @@ class CreateTest(unittest.TestCase):
       assert warning_text in self._caplog.text
 
 
+class PartitionTest(unittest.TestCase):
+  def test_partition_boundedness(self):
+    def partition_fn(val, num_partitions):
+      return val % num_partitions
+
+    class UnboundedDoFn(beam.DoFn):
+      @beam.DoFn.unbounded_per_element()
+      def process(self, element):
+        yield element
+
+    with beam.testing.test_pipeline.TestPipeline() as p:
+      source = p | beam.Create([1, 2, 3, 4, 5])
+      p1, p2, p3 = source | "bounded" >> beam.Partition(partition_fn, 3)
+
+      self.assertEqual(source.is_bounded, True)
+      self.assertEqual(p1.is_bounded, True)
+      self.assertEqual(p2.is_bounded, True)
+      self.assertEqual(p3.is_bounded, True)
+
+      unbounded = source | beam.ParDo(UnboundedDoFn())
+      p4, p5, p6 = unbounded | "unbounded" >> beam.Partition(partition_fn, 3)
+
+      self.assertEqual(unbounded.is_bounded, False)
+      self.assertEqual(p4.is_bounded, False)
+      self.assertEqual(p5.is_bounded, False)
+      self.assertEqual(p6.is_bounded, False)
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()

Reply via email to