This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ac0fe431e6 [yaml] : fix create different type elements issue (#37585)
2ac0fe431e6 is described below

commit 2ac0fe431e60888eb8222a82880205a4b8bf9e32
Author: Derrick Williams <[email protected]>
AuthorDate: Tue Feb 17 14:09:10 2026 -0500

    [yaml] : fix create different type elements issue (#37585)
    
    * fix mixed types
    
    * add another create pipeline
    
    * remove comment
    
    * address gemini comment about performance
    
    * address doc string and cloud pickle comments
---
 sdks/python/apache_beam/yaml/tests/create.yaml     | 18 ++++++++++++
 sdks/python/apache_beam/yaml/yaml_provider.py      | 32 ++++++++++++++++++++++
 .../apache_beam/yaml/yaml_provider_unit_test.py    | 13 +++++++++
 3 files changed, 63 insertions(+)

diff --git a/sdks/python/apache_beam/yaml/tests/create.yaml 
b/sdks/python/apache_beam/yaml/tests/create.yaml
index bf346f7667c..6cd7807681c 100644
--- a/sdks/python/apache_beam/yaml/tests/create.yaml
+++ b/sdks/python/apache_beam/yaml/tests/create.yaml
@@ -138,3 +138,21 @@ pipelines:
               - {sdk: MapReduce, year: 2004}
               - {sdk: MillWheel, year: 2008}
 
+
+  # Simple Create with mixed types
+  - pipeline:
+      type: chain
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - 1
+              - {a: 2, c: "hello"}
+              - 3
+        - type: AssertEqual
+          config:
+            elements:
+              - {element: 1, a: null, c: null}
+              - {element: null, a: 2, c: "hello"}
+              - {element: 3, a: null, c: null}
+
diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py 
b/sdks/python/apache_beam/yaml/yaml_provider.py
index e9882602d10..5a3ccf6b0c2 100755
--- a/sdks/python/apache_beam/yaml/yaml_provider.py
+++ b/sdks/python/apache_beam/yaml/yaml_provider.py
@@ -864,6 +864,19 @@ class YamlProviders:
                 str: "bar"
                  values: [4, 5, 6]
 
+    If the elements are a mix of dicts and non-dicts, the non-dict elements
+    will be wrapped in a Row with a single field "element". For example::
+
+        type: Create
+        config:
+          elements: [1, {"a": 2}]
+
+    will result in an output with two elements with a schema of
+    Row(element=int, a=int) looking like:
+
+        Row(element=1, a=None)
+        Row(element=None, a=2)
+
     Args:
         elements: The set of elements that should belong to the PCollection.
             YAML/JSON-style mappings will be interpreted as Beam rows.
@@ -878,6 +891,25 @@ class YamlProviders:
     if not isinstance(elements, Iterable) or isinstance(elements, (dict, str)):
       raise TypeError('elements must be a list of elements')
 
+    if elements:
+      # Normalize elements to be all dicts or all primitives.
+      has_dict = False
+      has_non_dict = False
+      for e in elements:
+        if isinstance(e, dict):
+          has_dict = True
+        else:
+          has_non_dict = True
+        if has_dict and has_non_dict:
+          break
+
+      if has_dict and has_non_dict:
+        elements = [
+            e if isinstance(e, dict) else {
+                'element': e
+            } for e in elements
+        ]
+
     # Check if elements have different keys
     updated_elements = elements
     if elements and all(isinstance(e, dict) for e in elements):
diff --git a/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py 
b/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py
index 1ebae9a3b44..e1e3ee847d9 100644
--- a/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py
@@ -364,3 +364,16 @@ class JoinUrlOrFilepathTest(unittest.TestCase):
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()
+
+
+class YamlProvidersCreateTest(unittest.TestCase):
+  def test_create_mixed_types(self):
+    with beam.Pipeline() as p:
+      # A mix of a primitive (Row(element=1)) and a dict (Row(a=2))
+      result = p | YamlProviders.create([1, {"a": 2}])
+      assert_that(
+          result | beam.Map(lambda x: sorted(x._asdict().items())),
+          equal_to([
+              [('a', None), ('element', 1)],
+              [('a', 2), ('element', None)],
+          ]))

Reply via email to