This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b3c317957f2 Add core and missing transformers for yaml integration 
testing - phase 2sh (#34654)
b3c317957f2 is described below

commit b3c317957f2ef92e6ced8dd1cf56fcabb077d538
Author: Derrick Williams <[email protected]>
AuthorDate: Tue Apr 22 13:22:39 2025 -0400

    Add core and missing transformers for yaml integration testing - phase 2sh 
(#34654)
    
    * add explode test
    
    * add filter test
    
    * add flatten test
    
    * add partition test
    
    * add pytransform test
    
    * update yaml_mapping docs
    
    * yapf fix
---
 sdks/python/apache_beam/yaml/tests/explode.yaml    |  79 ++++++
 sdks/python/apache_beam/yaml/tests/filter.yaml     | 147 +++++++++++
 sdks/python/apache_beam/yaml/tests/flatten.yaml    | 125 ++++++++++
 sdks/python/apache_beam/yaml/tests/partition.yaml  | 268 +++++++++++++++++++++
 .../python/apache_beam/yaml/tests/pytransform.yaml | 159 ++++++++++++
 sdks/python/apache_beam/yaml/yaml_mapping.py       |   9 +-
 6 files changed, 784 insertions(+), 3 deletions(-)

diff --git a/sdks/python/apache_beam/yaml/tests/explode.yaml 
b/sdks/python/apache_beam/yaml/tests/explode.yaml
new file mode 100644
index 00000000000..c1e07e51905
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/tests/explode.yaml
@@ -0,0 +1,79 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pipelines:
+  # Simple Explode with single field
+  - pipeline:
+      type: chain
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - produce: ['Strawberry', 'Corn']
+                season: ['spring', 'summer']
+        - type: Explode
+          name: Flatten lists
+          config:
+            fields: [produce]
+        - type: AssertEqual
+          config:
+            elements:
+              - {produce: 'Strawberry', season: ['spring','summer']}
+              - {produce: 'Corn', season: ['spring', 'summer']}
+
+  # Simple Explode with multiple fields and cross_product=False
+  - pipeline:
+      type: chain
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - produce: ['Strawberry', 'Corn']
+                season: ['spring', 'summer']
+        - type: Explode
+          name: Flatten lists
+          config:
+            fields: [produce, season]
+            cross_product: false
+        - type: AssertEqual
+          config:
+            elements:
+              - {produce: 'Strawberry', season: 'spring'}
+              - {produce: 'Corn', season: 'summer'}
+
+  # Simple Explode with multiple fields and cross_product=True
+  - pipeline:
+      type: chain
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - produce: ['Strawberry', 'Corn']
+                season: ['spring', 'summer']
+        - type: Explode
+          name: Flatten lists
+          config:
+            fields: [produce, season]
+            cross_product: true
+        - type: AssertEqual
+          config:
+            elements:
+              - {produce: 'Strawberry', season: 'spring'}
+              - {produce: 'Strawberry', season: 'summer'}
+              - {produce: 'Corn', season: 'spring'}
+              - {produce: 'Corn', season: 'summer'}
+
diff --git a/sdks/python/apache_beam/yaml/tests/filter.yaml 
b/sdks/python/apache_beam/yaml/tests/filter.yaml
new file mode 100644
index 00000000000..40f0307770d
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/tests/filter.yaml
@@ -0,0 +1,147 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+fixtures:
+  - name: TEMP_DIR
+    type: "tempfile.TemporaryDirectory"
+
+pipelines:
+  # Simple Filter using Python
+  - pipeline:
+      type: chain
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - {transaction_id: "T0012", product_name: "Headphones", 
category: "Electronics", price: 59.99}
+              - {transaction_id: "T5034", product_name: "Leather Jacket", 
category: "Apparel", price: 109.99}
+              - {transaction_id: "T0024", product_name: "Aluminum Mug", 
category: "Kitchen", price: 29.9}
+              - {transaction_id: "T0104", product_name: "Headphones", 
category: "Electronics", price: 59.99}
+              - {transaction_id: "T0302", product_name: "Monitor", category: 
"Electronics", price: 249.99}
+        - type: Filter
+          config:
+            language: python
+            keep: category == "Electronics" and price > 100
+        - type: AssertEqual
+          config:
+            elements:
+              - {transaction_id: "T0302", product_name: "Monitor", category: 
"Electronics", price: 249.99}
+
+  # Simple Filter using Python callable
+  - pipeline:
+      type: chain
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - {transaction_id: "T0012", product_name: "Headphones", 
category: "Electronics", price: 59.99}
+              - {transaction_id: "T5034", product_name: "Leather Jacket", 
category: "Apparel", price: 109.99}
+              - {transaction_id: "T0024", product_name: "Aluminum Mug", 
category: "Kitchen", price: 29.9}
+              - {transaction_id: "T0104", product_name: "Headphones", 
category: "Electronics", price: 59.99}
+              - {transaction_id: "T0302", product_name: "Monitor", category: 
"Electronics", price: 249.99}
+        - type: Filter
+          config:
+            language: python
+            keep:
+              callable: |
+                def my_filter(row):
+                  return row.category == "Electronics" and row.price > 100
+        - type: AssertEqual
+          config:
+            elements:
+              - {transaction_id: "T0302", product_name: "Monitor", category: 
"Electronics", price: 249.99}
+
+  # Simple Filter using Java
+  - pipeline:
+      type: chain
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - {transaction_id: "T0012", product_name: "Headphones", 
category: "Electronics", price: 59.99}
+              - {transaction_id: "T5034", product_name: "Leather Jacket", 
category: "Apparel", price: 109.99}
+              - {transaction_id: "T0024", product_name: "Aluminum Mug", 
category: "Kitchen", price: 29.9}
+              - {transaction_id: "T0104", product_name: "Headphones", 
category: "Electronics", price: 59.99}
+              - {transaction_id: "T0302", product_name: "Monitor", category: 
"Electronics", price: 249.99}
+        - type: Filter
+          config:
+            language: java
+            keep: category.equals("Electronics") && price > 100
+        - type: AssertEqual
+          config:
+            elements:
+              - {transaction_id: "T0302", product_name: "Monitor", category: 
"Electronics", price: 249.99}
+
+  # Simple Filter using Java callable
+  - pipeline:
+      type: chain
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - {transaction_id: "T0012", product_name: "Headphones", 
category: "Electronics", price: 59.99}
+              - {transaction_id: "T5034", product_name: "Leather Jacket", 
category: "Apparel", price: 109.99}
+              - {transaction_id: "T0024", product_name: "Aluminum Mug", 
category: "Kitchen", price: 29.9}
+              - {transaction_id: "T0104", product_name: "Headphones", 
category: "Electronics", price: 59.99}
+              - {transaction_id: "T0302", product_name: "Monitor", category: 
"Electronics", price: 249.99}
+        - type: Filter
+          config:
+            language: java
+            keep:
+              callable: |
+                import org.apache.beam.sdk.values.Row;
+                import java.util.function.Function;
+                public class MyFunction implements Function<Row, Boolean> {
+                  public Boolean apply(Row row) {
+                    return row.getString("category").equals("Electronics") && 
row.getDouble("price") > 100;
+                  }
+                }
+        - type: AssertEqual
+          config:
+            elements:
+              - {transaction_id: "T0302", product_name: "Monitor", category: 
"Electronics", price: 249.99}
+
+
+  # Simple Filter with error handling
+  - pipeline:
+      type: composite
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - {category: "Electronics", price: 59.99, product_name: 
"Headphones", transaction_id: "T0012"}
+        - type: Filter
+          name: FilterWithCategory
+          input: Create
+          config:
+            language: python
+            keep: category == Electronics
+            error_handling:
+              output: error_output
+        - type: WriteToJson
+          name: WriteErrorsToJson
+          input: FilterWithCategory.error_output
+          config:
+            path: "{TEMP_DIR}/error.json"
+
+  # Read errors from previous pipeline
+  - pipeline:
+      type: chain
+      transforms:
+        - type: ReadFromJson
+          config:
+            path: "{TEMP_DIR}/error.json*"
diff --git a/sdks/python/apache_beam/yaml/tests/flatten.yaml 
b/sdks/python/apache_beam/yaml/tests/flatten.yaml
new file mode 100644
index 00000000000..1c375570ca9
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/tests/flatten.yaml
@@ -0,0 +1,125 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pipelines:
+  # Simple Flatten with three inputs
+  - pipeline:
+      type: composite
+      transforms:
+        - type: Create
+          name: CreateA
+          config:
+            elements:
+              - {transaction_id: "T0012", product_name: "Headphones", 
category: "Electronics", price: 59.99}
+              - {transaction_id: "T5034", product_name: "Leather Jacket", 
category: "Apparel", price: 109.99}
+        - type: Create
+          name: CreateB
+          config:
+            elements:
+              - {transaction_id: "T0024", product_name: "Aluminum Mug", 
category: "Kitchen", price: 29.9}
+        - type: Create
+          name: CreateC
+          config:
+            elements:
+              - {transaction_id: "T0104", product_name: "Headphones", 
category: "Electronics", price: 59.99}
+              - {transaction_id: "T0302", product_name: "Monitor", category: 
"Electronics", price: 249.99}
+        - type: Flatten
+          input: [CreateA, CreateB, CreateC]
+        - type: AssertEqual
+          input: Flatten
+          config:
+            elements:
+              - {transaction_id: "T0012", product_name: "Headphones", 
category: "Electronics", price: 59.99}
+              - {transaction_id: "T5034", product_name: "Leather Jacket", 
category: "Apparel", price: 109.99}
+              - {transaction_id: "T0024", product_name: "Aluminum Mug", 
category: "Kitchen", price: 29.9}
+              - {transaction_id: "T0104", product_name: "Headphones", 
category: "Electronics", price: 59.99}
+              - {transaction_id: "T0302", product_name: "Monitor", category: 
"Electronics", price: 249.99}
+
+  # Simple Flatten with one input
+  - pipeline:
+      type: composite
+      transforms:
+        - type: Create
+          name: CreateA
+          config:
+            elements:
+              - {transaction_id: "T0012", product_name: "Headphones", 
category: "Electronics", price: 59.99}
+              - {transaction_id: "T5034", product_name: "Leather Jacket", 
category: "Apparel", price: 109.99}
+              - {transaction_id: "T0024", product_name: "Aluminum Mug", 
category: "Kitchen", price: 29.9}
+        - type: Flatten
+          input: CreateA
+        - type: AssertEqual
+          input: Flatten
+          config:
+            elements:
+              - {transaction_id: "T0012", product_name: "Headphones", 
category: "Electronics", price: 59.99}
+              - {transaction_id: "T5034", product_name: "Leather Jacket", 
category: "Apparel", price: 109.99}
+              - {transaction_id: "T0024", product_name: "Aluminum Mug", 
category: "Kitchen", price: 29.9}
+
+  # Simple Flatten with duplicates in each input
+  - pipeline:
+      type: composite
+      transforms:
+        - type: Create
+          name: CreateA
+          config:
+            elements:
+              - {transaction_id: "T0012", product_name: "Headphones", 
category: "Electronics", price: 59.99}
+              - {transaction_id: "T0012", product_name: "Headphones", 
category: "Electronics", price: 59.99}
+        - type: Create
+          name: CreateB
+          config:
+            elements:
+              - {transaction_id: "T5034", product_name: "Leather Jacket", 
category: "Apparel", price: 109.99}
+              - {transaction_id: "T5034", product_name: "Leather Jacket", 
category: "Apparel", price: 109.99}
+        - type: Flatten
+          input: [CreateA, CreateB]
+        - type: AssertEqual
+          input: Flatten
+          config:
+            elements:
+              - {transaction_id: "T0012", product_name: "Headphones", 
category: "Electronics", price: 59.99}
+              - {transaction_id: "T0012", product_name: "Headphones", 
category: "Electronics", price: 59.99}
+              - {transaction_id: "T5034", product_name: "Leather Jacket", 
category: "Apparel", price: 109.99}
+              - {transaction_id: "T5034", product_name: "Leather Jacket", 
category: "Apparel", price: 109.99}
+
+  # Simple Flatten with duplicates across inputs
+  - pipeline:
+      type: composite
+      transforms:
+        - type: Create
+          name: CreateA
+          config:
+            elements:
+              - {transaction_id: "T0012", product_name: "Headphones", 
category: "Electronics", price: 59.99}
+              - {transaction_id: "T5034", product_name: "Leather Jacket", 
category: "Apparel", price: 109.99}
+        - type: Create
+          name: CreateB
+          config:
+            elements:
+              - {transaction_id: "T5034", product_name: "Leather Jacket", 
category: "Apparel", price: 109.99}
+              - {transaction_id: "T0012", product_name: "Headphones", 
category: "Electronics", price: 59.99}
+        - type: Flatten
+          input: [CreateA, CreateB]
+        - type: AssertEqual
+          input: Flatten
+          config:
+            elements:
+              - {transaction_id: "T0012", product_name: "Headphones", 
category: "Electronics", price: 59.99}
+              - {transaction_id: "T0012", product_name: "Headphones", 
category: "Electronics", price: 59.99}
+              - {transaction_id: "T5034", product_name: "Leather Jacket", 
category: "Apparel", price: 109.99}
+              - {transaction_id: "T5034", product_name: "Leather Jacket", 
category: "Apparel", price: 109.99}
diff --git a/sdks/python/apache_beam/yaml/tests/partition.yaml 
b/sdks/python/apache_beam/yaml/tests/partition.yaml
new file mode 100644
index 00000000000..6cba67425a2
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/tests/partition.yaml
@@ -0,0 +1,268 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+fixtures:
+  - name: TEMP_DIR
+    type: "tempfile.TemporaryDirectory"
+
+pipelines:
+  # Simple Partition into one group with individual values
+  - pipeline:
+      type: composite
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - produce: 'Apple'
+              - produce: 'Apricot'
+              - produce: 'Blueberry'
+              - produce: 'Date'
+              - produce: 'Fig'
+              - produce: 'Mango'
+              - produce: 'Orange'
+              - produce: 'Strawberry'
+        - type: Partition
+          name: PartitionFruits
+          input: Create
+          config:
+            by: "'fruits'"
+            language: python
+            outputs:
+              - "fruits"
+        - type: AssertEqual
+          input: PartitionFruits.fruits
+          config:
+            elements:
+              - produce: 'Apple'
+              - produce: 'Apricot'
+              - produce: 'Blueberry'
+              - produce: 'Date'
+              - produce: 'Fig'
+              - produce: 'Mango'
+              - produce: 'Orange'
+              - produce: 'Strawberry'
+
+  # Simple Partition into two groups with individual values
+  - pipeline:
+      type: composite
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - produce: 'Apple'
+              - produce: 'Apricot'
+              - produce: 'Blueberry'
+              - produce: 'Date'
+              - produce: 'Fig'
+              - produce: 'Mango'
+              - produce: 'Orange'
+              - produce: 'Strawberry'
+        - type: Partition
+          name: PartitionFruits
+          input: Create
+          config:
+            by: "'vowel_fruits' if produce[0].upper() in 'AEIOU' else 
'consonant_fruits'"
+            language: python
+            outputs:
+              - "vowel_fruits"
+              - "consonant_fruits"
+        - type: AssertEqual
+          input: PartitionFruits.vowel_fruits
+          config:
+            elements:
+              - produce: 'Apple'
+              - produce: 'Apricot'
+              - produce: 'Orange'
+        - type: AssertEqual
+          input: PartitionFruits.consonant_fruits
+          config:
+            elements:
+              - produce: 'Blueberry'
+              - produce: 'Date'
+              - produce: 'Fig'
+              - produce: 'Mango'
+              - produce: 'Strawberry'
+
+  # Simple Partition into two groups with lists of values
+  - pipeline:
+      type: composite
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - produce: ['Grapefruit', 'Cantaloupe', 'Fig', 'Raspberry', 
'Lemon', 'Peach', 'Watermelon']
+              - produce: ['Strawberry', 'Kiwi', 'Lime', 'Pear', 'Plum', 
'Orange', 'Pomegranate', 'Cranberry']
+              - produce: ['Banana']
+              - produce: ['Apricot', 'Blackberry', 'Mango', 'Apple', 
'Blueberry']
+              - produce: ['Cherry', 'Grape']
+        - type: Partition
+          name: PartitionFruits
+          input: Create
+          config:
+            by: "'too_much_fruits' if len(produce) > 4 else 
'just_right_fruits'"
+            language: python
+            outputs:
+              - "too_much_fruits"
+              - "just_right_fruits"
+        - type: AssertEqual
+          input: PartitionFruits.too_much_fruits
+          config:
+            elements:
+              - produce: ['Grapefruit', 'Cantaloupe', 'Fig', 'Raspberry', 
'Lemon', 'Peach', 'Watermelon']
+              - produce: ['Strawberry', 'Kiwi', 'Lime', 'Pear', 'Plum', 
'Orange', 'Pomegranate', 'Cranberry']
+              - produce: ['Apricot', 'Blackberry', 'Mango', 'Apple', 
'Blueberry']
+        - type: AssertEqual
+          input: PartitionFruits.just_right_fruits
+          config:
+            elements:
+              - produce: ['Banana']
+              - produce: ['Cherry', 'Grape']
+
+  # Sequentially Partition into two groups each with lists of values
+  - pipeline:
+      type: composite
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - produce: ['Grapefruit', 'Cantaloupe', 'Fig', 'Raspberry', 
'Lemon', 'Peach', 'Watermelon']
+              - produce: ['Strawberry', 'Kiwi', 'Lime', 'Pear', 'Plum', 
'Orange', 'Pomegranate', 'Cranberry']
+              - produce: ['Banana']
+              - produce: ['Apricot', 'Blackberry', 'Mango', 'Apple', 
'Blueberry']
+              - produce: ['Cherry', 'Grape']
+        - type: Partition
+          name: PartitionFruitsSize
+          input: Create
+          config:
+            by: "'too_much_fruits' if len(produce) > 4 else 
'just_right_fruits'"
+            language: python
+            outputs:
+              - "too_much_fruits"
+              - "just_right_fruits"
+        - type: Partition
+          name: PartitionFruitsYummyness
+          input: PartitionFruitsSize.too_much_fruits
+          config:
+            by:
+              callable: |
+                def partition_by_yummyness(row):
+                  produce_list = row.produce
+                  if any(fruit[0].upper() in 'AEIOU' for fruit in 
produce_list):
+                    return 'yummy'
+                  else:
+                    return 'not_yummy'            
+            language: python
+            outputs:
+              - "yummy"
+              - "not_yummy"
+        - type: AssertEqual
+          input: PartitionFruitsYummyness.yummy
+          config:
+            elements:
+              - produce: ['Strawberry', 'Kiwi', 'Lime', 'Pear', 'Plum', 
'Orange', 'Pomegranate', 'Cranberry']
+              - produce: ['Apricot', 'Blackberry', 'Mango', 'Apple', 
'Blueberry']
+        - type: AssertEqual
+          input: PartitionFruitsYummyness.not_yummy
+          config:
+            elements:
+              - produce: ['Grapefruit', 'Cantaloupe', 'Fig', 'Raspberry', 
'Lemon', 'Peach', 'Watermelon']
+
+
+  # Simple Partition into two groups with individual values with unknown output
+  - pipeline:
+      type: composite
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - produce: 'Apple'
+              - produce: 'Apricot'
+              - produce: 'Blueberry'
+              - produce: 'Date'
+              - produce: 'Fig'
+              - produce: 'Mango'
+              - produce: 'Orange'
+              - produce: 'Strawberry'
+        - type: Partition
+          name: PartitionFruits
+          input: Create
+          config:
+            by:
+              callable: |
+                def partition(row):
+                  if row.produce[0].upper() in 'AEIOU':
+                    return 'vowel_fruits'
+                  elif row.produce in ['Date', 'Mango']:
+                    return 'yummy_fruits'
+            language: python
+            outputs:
+              - "vowel_fruits"
+              - "yummy_fruits"
+            unknown_output: "unknown_fruits"
+        - type: AssertEqual
+          input: PartitionFruits.vowel_fruits
+          config:
+            elements:
+              - produce: 'Apple'
+              - produce: 'Apricot'
+              - produce: 'Orange'
+        - type: AssertEqual
+          input: PartitionFruits.yummy_fruits
+          config:
+            elements:
+              - produce: 'Date'
+              - produce: 'Mango'
+        - type: AssertEqual
+          input: PartitionFruits.unknown_fruits
+          config:
+            elements:
+              - produce: 'Blueberry'
+              - produce: 'Fig'
+              - produce: 'Strawberry'
+
+
+  # Simple Partition with error
+  - pipeline:
+      type: composite
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - produce: 'Apple'
+              - produce: 'Apricot'
+              - produce: 'Blueberry'
+              - produce: 'Date'
+              - produce: 'Fig'
+              - produce: 'Mango'
+              - produce: 'Orange'
+              - produce: 'Strawberry'
+        - type: Partition
+          name: PartitionFruits
+          input: Create
+          config:
+            by: "'vowel_fruits' if produce[100] in 'AEIOU' else 
'consonant_fruits'"
+            language: python
+            outputs:
+              - "vowel_fruits"
+              - "consonant_fruits"
+            error_handling:
+              output: error_output
+        - type: WriteToJson
+          name: WriteErrorsToJson
+          input: PartitionFruits.error_output
+          config:
+            path: "{TEMP_DIR}/error.json"
diff --git a/sdks/python/apache_beam/yaml/tests/pytransform.yaml 
b/sdks/python/apache_beam/yaml/tests/pytransform.yaml
new file mode 100644
index 00000000000..241a16fd7c6
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/tests/pytransform.yaml
@@ -0,0 +1,159 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+fixtures:
+  - name: TEMP_DIR
+    type: "tempfile.TemporaryDirectory"
+
+pipelines:
+  # Simple PyTransform using __constructor__
+  - pipeline:
+      type: chain
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - {price: 1.29, produce: 'Apple'}
+              - {price: 0.29, produce: 'Apricot'}
+              - {price: 0.02, produce: 'Blueberry'}
+              - {price: 0.19, produce: 'Date'}
+        - type: PyTransform
+          config:
+            constructor: __constructor__
+            kwargs:
+              source: |
+                def increase(inc):
+                  return beam.Map(lambda x: beam.Row(price=x.price + inc, 
produce=x.produce))
+              inc: 0.30
+        - type: AssertEqual
+          config:
+            elements:
+              - {price: 1.59, produce: 'Apple'}
+              - {price: 0.59, produce: 'Apricot'}
+              - {price: 0.32, produce: 'Blueberry'}
+              - {price: 0.49, produce: 'Date'}
+
+  # Simple PyTransform using __constructor__ beam.PTransform
+  - pipeline:
+      type: chain
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - {price: 1.29, produce: 'Apple'}
+              - {price: 0.29, produce: 'Apricot'}
+              - {price: 0.02, produce: 'Blueberry'}
+              - {price: 0.19, produce: 'Date'}
+        - type: PyTransform
+          config:
+            constructor: __constructor__
+            kwargs:
+              source: |
+                class MyPTransform(beam.PTransform):
+                  def __init__(self, inc):
+                    self._inc = inc
+                  def expand(self, pcoll):
+                    return pcoll | beam.Map(lambda x: beam.Row(price=x.price + 
self._inc, produce=x.produce))
+              inc: 0.30
+        - type: AssertEqual
+          config:
+            elements:
+              - {price: 1.59, produce: 'Apple'}
+              - {price: 0.59, produce: 'Apricot'}
+              - {price: 0.32, produce: 'Blueberry'}
+              - {price: 0.49, produce: 'Date'}
+
+  # Simple PyTransform using __callable__
+  - pipeline:
+      type: chain
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - {price: 1.29, produce: 'Apple'}
+              - {price: 0.29, produce: 'Apricot'}
+              - {price: 0.02, produce: 'Blueberry'}
+              - {price: 0.19, produce: 'Date'}
+        - type: PyTransform
+          config:
+            constructor: __callable__
+            kwargs:
+              source: |
+                def increase(pcoll, inc):
+                  return pcoll | beam.Map(lambda x: beam.Row(price=x.price + 
inc, produce=x.produce))
+              inc: 0.30
+        - type: AssertEqual
+          config:
+            elements:
+              - {price: 1.59, produce: 'Apple'}
+              - {price: 0.59, produce: 'Apricot'}
+              - {price: 0.32, produce: 'Blueberry'}
+              - {price: 0.49, produce: 'Date'}
+
+  # Create Csv for pytransform read in next pipeline
+  - pipeline:
+      type: chain
+      transforms:
+        - type: Create
+          config:
+            elements:
+              - {price: 1.29, produce: 'Apple'}
+              - {price: 0.29, produce: 'Apricot'}
+              - {price: 0.02, produce: 'Blueberry'}
+              - {price: 0.19, produce: 'Date'}
+        - type: WriteToCsv
+          config:
+            path: "{TEMP_DIR}/out.csv"
+
+  # Simple PyTransform using ReadFromCsv with args
+  - pipeline:
+      type: composite
+      transforms:
+        - type: PyTransform
+          name: PyTransformReadFromCsv
+          input: {}
+          config:
+            constructor: apache_beam.io.ReadFromCsv
+            args: ['{TEMP_DIR}/out.csv*']
+        - type: AssertEqual
+          input: PyTransformReadFromCsv
+          config:
+            elements:
+              - {price: 1.29, produce: 'Apple'}
+              - {price: 0.29, produce: 'Apricot'}
+              - {price: 0.02, produce: 'Blueberry'}
+              - {price: 0.19, produce: 'Date'}
+
+  # Simple PyTransform using ReadFromCsv with kwargs
+  - pipeline:
+      type: composite
+      transforms:
+        - type: PyTransform
+          name: PyTransformReadFromCsv
+          input: {}
+          config:
+            constructor: apache_beam.io.ReadFromCsv
+            kwargs:
+              path: '{TEMP_DIR}/out.csv*'
+        - type: AssertEqual
+          input: PyTransformReadFromCsv
+          config:
+            elements:
+              - {price: 1.29, produce: 'Apple'}
+              - {price: 0.29, produce: 'Apricot'}
+              - {price: 0.02, produce: 'Blueberry'}
+              - {price: 0.19, produce: 'Date'}
diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py 
b/sdks/python/apache_beam/yaml/yaml_mapping.py
index 5faed9af454..9985e9756fd 100644
--- a/sdks/python/apache_beam/yaml/yaml_mapping.py
+++ b/sdks/python/apache_beam/yaml/yaml_mapping.py
@@ -564,7 +564,10 @@ class _Explode(beam.PTransform):
         cross_product = True
     self._fields = fields
     self._cross_product = cross_product
-    # TODO(yaml): Support standard error handling argument.
+    # TODO(yaml):
+    # 1. Support standard error handling argument.
+    # 2. Supposedly error_handling parameter is not an accepted parameter when
+    #    executing.  Needs further investigation.
     self._exception_handling_args = exception_handling_args(error_handling)
 
   @maybe_with_exception_handling
@@ -750,7 +753,7 @@ def _Partition(
     outputs: list[str],
     unknown_output: Optional[str] = None,
     error_handling: Optional[Mapping[str, Any]] = None,
-    language: Optional[str] = 'generic'):
+    language: str = 'generic'):
   """Splits an input into several distinct outputs.
 
   Each input element will go to a distinct output based on the field or
@@ -767,7 +770,7 @@ def _Partition(
         parameter.
       error_handling: (Optional) Whether and how to handle errors during
         partitioning.
-      language: (Optional) The language of the `by` expression.
+      language: The language of the `by` expression.
   """
   split_fn = _as_callable_for_pcoll(pcoll, by, 'by', language)
   try:

Reply via email to