liferoad commented on code in PR #35675:
URL: https://github.com/apache/beam/pull/35675#discussion_r2228900506


##########
sdks/python/apache_beam/yaml/yaml_transform_test.py:
##########
@@ -217,6 +217,104 @@ def test_implicit_flatten(self):
           providers=TEST_PROVIDERS)
       assert_that(result, equal_to([1, 4, 9, 10000, 40000]))
 
+  def test_flatten_different_schemas_error(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      with self.assertRaisesRegex(
+          Exception, r"Cannot flatten PCollections with different schemas"):
+        _ = p | YamlTransform(
+            '''
+            type: composite
+            transforms:
+              - type: Create
+                name: Create1
+                config:
+                  elements:
+                    - {'ride_id': '1', 'passenger_count': 1}
+                    - {'ride_id': '2', 'passenger_count': 2}
+              - type: Create
+                name: Create2
+                config:
+                  elements:
+                    - {'ride_id': '3'}
+                    - {'ride_id': '4'}
+              - type: Flatten
+                name: Flatten1
+                input:
+                  - Create1
+                  - Create2
+            output: Flatten1
+            ''',
+            providers=TEST_PROVIDERS)
+
+  def test_flatten_compatible_schemas_success(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      result = p | YamlTransform(
+          '''
+          type: composite
+          transforms:
+            - type: Create
+              name: Create1
+              config:
+                elements:
+                  - {'ride_id': '1', 'passenger_count': 1}
+                  - {'ride_id': '2', 'passenger_count': 2}
+            - type: Create
+              name: Create2
+              config:
+                elements:
+                  - {'ride_id': '3', 'passenger_count': 3}
+                  - {'ride_id': '4', 'passenger_count': 4}
+            - type: Flatten
+              name: Flatten1
+              input:
+                - Create1
+                - Create2
+          output: Flatten1
+          ''',
+          providers=TEST_PROVIDERS)
+      # This should not raise an error since the schemas are identical
+      assert_that(
+          result,
+          equal_to([
+              beam.Row(ride_id='1', passenger_count=1),
+              beam.Row(ride_id='2', passenger_count=2),
+              beam.Row(ride_id='3', passenger_count=3),
+              beam.Row(ride_id='4', passenger_count=4)
+          ]))
+
+  def test_flatten_with_null_values_error(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      # This should raise an error because null values create different schema 
types
+      # (nullable logical type vs INT64)
+      with self.assertRaisesRegex(

Review Comment:
   @damccorm I do not like this but Beam treat this case with different schemas 
since passenger_count should be INT64 not None. So we report the error now with 
this validation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to