This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b3c317957f2 Add core and missing transformers for yaml integration
testing - phase 2sh (#34654)
b3c317957f2 is described below
commit b3c317957f2ef92e6ced8dd1cf56fcabb077d538
Author: Derrick Williams <[email protected]>
AuthorDate: Tue Apr 22 13:22:39 2025 -0400
Add core and missing transformers for yaml integration testing - phase 2sh
(#34654)
* add explode test
* add filter test
* add flatten test
* add partition test
* add pytransform test
* update yaml_mapping docs
* yapf fix
---
sdks/python/apache_beam/yaml/tests/explode.yaml | 79 ++++++
sdks/python/apache_beam/yaml/tests/filter.yaml | 147 +++++++++++
sdks/python/apache_beam/yaml/tests/flatten.yaml | 125 ++++++++++
sdks/python/apache_beam/yaml/tests/partition.yaml | 268 +++++++++++++++++++++
.../python/apache_beam/yaml/tests/pytransform.yaml | 159 ++++++++++++
sdks/python/apache_beam/yaml/yaml_mapping.py | 9 +-
6 files changed, 784 insertions(+), 3 deletions(-)
diff --git a/sdks/python/apache_beam/yaml/tests/explode.yaml
b/sdks/python/apache_beam/yaml/tests/explode.yaml
new file mode 100644
index 00000000000..c1e07e51905
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/tests/explode.yaml
@@ -0,0 +1,79 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pipelines:
+ # Simple Explode with single field
+ - pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - produce: ['Strawberry', 'Corn']
+ season: ['spring', 'summer']
+ - type: Explode
+ name: Flatten lists
+ config:
+ fields: [produce]
+ - type: AssertEqual
+ config:
+ elements:
+ - {produce: 'Strawberry', season: ['spring','summer']}
+ - {produce: 'Corn', season: ['spring', 'summer']}
+
+ # Simple Explode with multiple fields and cross_product=False
+ - pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - produce: ['Strawberry', 'Corn']
+ season: ['spring', 'summer']
+ - type: Explode
+ name: Flatten lists
+ config:
+ fields: [produce, season]
+ cross_product: false
+ - type: AssertEqual
+ config:
+ elements:
+ - {produce: 'Strawberry', season: 'spring'}
+ - {produce: 'Corn', season: 'summer'}
+
+ # Simple Explode with multiple fields and cross_product=True
+ - pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - produce: ['Strawberry', 'Corn']
+ season: ['spring', 'summer']
+ - type: Explode
+ name: Flatten lists
+ config:
+ fields: [produce, season]
+ cross_product: true
+ - type: AssertEqual
+ config:
+ elements:
+ - {produce: 'Strawberry', season: 'spring'}
+ - {produce: 'Strawberry', season: 'summer'}
+ - {produce: 'Corn', season: 'spring'}
+ - {produce: 'Corn', season: 'summer'}
+
diff --git a/sdks/python/apache_beam/yaml/tests/filter.yaml
b/sdks/python/apache_beam/yaml/tests/filter.yaml
new file mode 100644
index 00000000000..40f0307770d
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/tests/filter.yaml
@@ -0,0 +1,147 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+fixtures:
+ - name: TEMP_DIR
+ type: "tempfile.TemporaryDirectory"
+
+pipelines:
+ # Simple Filter using Python
+ - pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - {transaction_id: "T0012", product_name: "Headphones",
category: "Electronics", price: 59.99}
+ - {transaction_id: "T5034", product_name: "Leather Jacket",
category: "Apparel", price: 109.99}
+ - {transaction_id: "T0024", product_name: "Aluminum Mug",
category: "Kitchen", price: 29.9}
+ - {transaction_id: "T0104", product_name: "Headphones",
category: "Electronics", price: 59.99}
+ - {transaction_id: "T0302", product_name: "Monitor", category:
"Electronics", price: 249.99}
+ - type: Filter
+ config:
+ language: python
+ keep: category == "Electronics" and price > 100
+ - type: AssertEqual
+ config:
+ elements:
+ - {transaction_id: "T0302", product_name: "Monitor", category:
"Electronics", price: 249.99}
+
+ # Simple Filter using Python callable
+ - pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - {transaction_id: "T0012", product_name: "Headphones",
category: "Electronics", price: 59.99}
+ - {transaction_id: "T5034", product_name: "Leather Jacket",
category: "Apparel", price: 109.99}
+ - {transaction_id: "T0024", product_name: "Aluminum Mug",
category: "Kitchen", price: 29.9}
+ - {transaction_id: "T0104", product_name: "Headphones",
category: "Electronics", price: 59.99}
+ - {transaction_id: "T0302", product_name: "Monitor", category:
"Electronics", price: 249.99}
+ - type: Filter
+ config:
+ language: python
+ keep:
+ callable: |
+ def my_filter(row):
+ return row.category == "Electronics" and row.price > 100
+ - type: AssertEqual
+ config:
+ elements:
+ - {transaction_id: "T0302", product_name: "Monitor", category:
"Electronics", price: 249.99}
+
+ # Simple Filter using Java
+ - pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - {transaction_id: "T0012", product_name: "Headphones",
category: "Electronics", price: 59.99}
+ - {transaction_id: "T5034", product_name: "Leather Jacket",
category: "Apparel", price: 109.99}
+ - {transaction_id: "T0024", product_name: "Aluminum Mug",
category: "Kitchen", price: 29.9}
+ - {transaction_id: "T0104", product_name: "Headphones",
category: "Electronics", price: 59.99}
+ - {transaction_id: "T0302", product_name: "Monitor", category:
"Electronics", price: 249.99}
+ - type: Filter
+ config:
+ language: java
+ keep: category.equals("Electronics") && price > 100
+ - type: AssertEqual
+ config:
+ elements:
+ - {transaction_id: "T0302", product_name: "Monitor", category:
"Electronics", price: 249.99}
+
+ # Simple Filter using Java callable
+ - pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - {transaction_id: "T0012", product_name: "Headphones",
category: "Electronics", price: 59.99}
+ - {transaction_id: "T5034", product_name: "Leather Jacket",
category: "Apparel", price: 109.99}
+ - {transaction_id: "T0024", product_name: "Aluminum Mug",
category: "Kitchen", price: 29.9}
+ - {transaction_id: "T0104", product_name: "Headphones",
category: "Electronics", price: 59.99}
+ - {transaction_id: "T0302", product_name: "Monitor", category:
"Electronics", price: 249.99}
+ - type: Filter
+ config:
+ language: java
+ keep:
+ callable: |
+ import org.apache.beam.sdk.values.Row;
+ import java.util.function.Function;
+ public class MyFunction implements Function<Row, Boolean> {
+ public Boolean apply(Row row) {
+ return row.getString("category").equals("Electronics") &&
row.getDouble("price") > 100;
+ }
+ }
+ - type: AssertEqual
+ config:
+ elements:
+ - {transaction_id: "T0302", product_name: "Monitor", category:
"Electronics", price: 249.99}
+
+
+ # Simple Filter with error handling
+ - pipeline:
+ type: composite
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - {category: "Electronics", price: 59.99, product_name:
"Headphones", transaction_id: "T0012"}
+ - type: Filter
+ name: FilterWithCategory
+ input: Create
+ config:
+ language: python
+ keep: category == Electronics
+ error_handling:
+ output: error_output
+ - type: WriteToJson
+ name: WriteErrorsToJson
+ input: FilterWithCategory.error_output
+ config:
+ path: "{TEMP_DIR}/error.json"
+
+ # Read errors from previous pipeline
+ - pipeline:
+ type: chain
+ transforms:
+ - type: ReadFromJson
+ config:
+ path: "{TEMP_DIR}/error.json*"
diff --git a/sdks/python/apache_beam/yaml/tests/flatten.yaml
b/sdks/python/apache_beam/yaml/tests/flatten.yaml
new file mode 100644
index 00000000000..1c375570ca9
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/tests/flatten.yaml
@@ -0,0 +1,125 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+pipelines:
+ # Simple Flatten with three inputs
+ - pipeline:
+ type: composite
+ transforms:
+ - type: Create
+ name: CreateA
+ config:
+ elements:
+ - {transaction_id: "T0012", product_name: "Headphones",
category: "Electronics", price: 59.99}
+ - {transaction_id: "T5034", product_name: "Leather Jacket",
category: "Apparel", price: 109.99}
+ - type: Create
+ name: CreateB
+ config:
+ elements:
+ - {transaction_id: "T0024", product_name: "Aluminum Mug",
category: "Kitchen", price: 29.9}
+ - type: Create
+ name: CreateC
+ config:
+ elements:
+ - {transaction_id: "T0104", product_name: "Headphones",
category: "Electronics", price: 59.99}
+ - {transaction_id: "T0302", product_name: "Monitor", category:
"Electronics", price: 249.99}
+ - type: Flatten
+ input: [CreateA, CreateB, CreateC]
+ - type: AssertEqual
+ input: Flatten
+ config:
+ elements:
+ - {transaction_id: "T0012", product_name: "Headphones",
category: "Electronics", price: 59.99}
+ - {transaction_id: "T5034", product_name: "Leather Jacket",
category: "Apparel", price: 109.99}
+ - {transaction_id: "T0024", product_name: "Aluminum Mug",
category: "Kitchen", price: 29.9}
+ - {transaction_id: "T0104", product_name: "Headphones",
category: "Electronics", price: 59.99}
+ - {transaction_id: "T0302", product_name: "Monitor", category:
"Electronics", price: 249.99}
+
+ # Simple Flatten with one input
+ - pipeline:
+ type: composite
+ transforms:
+ - type: Create
+ name: CreateA
+ config:
+ elements:
+ - {transaction_id: "T0012", product_name: "Headphones",
category: "Electronics", price: 59.99}
+ - {transaction_id: "T5034", product_name: "Leather Jacket",
category: "Apparel", price: 109.99}
+ - {transaction_id: "T0024", product_name: "Aluminum Mug",
category: "Kitchen", price: 29.9}
+ - type: Flatten
+ input: CreateA
+ - type: AssertEqual
+ input: Flatten
+ config:
+ elements:
+ - {transaction_id: "T0012", product_name: "Headphones",
category: "Electronics", price: 59.99}
+ - {transaction_id: "T5034", product_name: "Leather Jacket",
category: "Apparel", price: 109.99}
+ - {transaction_id: "T0024", product_name: "Aluminum Mug",
category: "Kitchen", price: 29.9}
+
+ # Simple Flatten with duplicates in each input
+ - pipeline:
+ type: composite
+ transforms:
+ - type: Create
+ name: CreateA
+ config:
+ elements:
+ - {transaction_id: "T0012", product_name: "Headphones",
category: "Electronics", price: 59.99}
+ - {transaction_id: "T0012", product_name: "Headphones",
category: "Electronics", price: 59.99}
+ - type: Create
+ name: CreateB
+ config:
+ elements:
+ - {transaction_id: "T5034", product_name: "Leather Jacket",
category: "Apparel", price: 109.99}
+ - {transaction_id: "T5034", product_name: "Leather Jacket",
category: "Apparel", price: 109.99}
+ - type: Flatten
+ input: [CreateA, CreateB]
+ - type: AssertEqual
+ input: Flatten
+ config:
+ elements:
+ - {transaction_id: "T0012", product_name: "Headphones",
category: "Electronics", price: 59.99}
+ - {transaction_id: "T0012", product_name: "Headphones",
category: "Electronics", price: 59.99}
+ - {transaction_id: "T5034", product_name: "Leather Jacket",
category: "Apparel", price: 109.99}
+ - {transaction_id: "T5034", product_name: "Leather Jacket",
category: "Apparel", price: 109.99}
+
+ # Simple Flatten with duplicates across inputs
+ - pipeline:
+ type: composite
+ transforms:
+ - type: Create
+ name: CreateA
+ config:
+ elements:
+ - {transaction_id: "T0012", product_name: "Headphones",
category: "Electronics", price: 59.99}
+ - {transaction_id: "T5034", product_name: "Leather Jacket",
category: "Apparel", price: 109.99}
+ - type: Create
+ name: CreateB
+ config:
+ elements:
+ - {transaction_id: "T5034", product_name: "Leather Jacket",
category: "Apparel", price: 109.99}
+ - {transaction_id: "T0012", product_name: "Headphones",
category: "Electronics", price: 59.99}
+ - type: Flatten
+ input: [CreateA, CreateB]
+ - type: AssertEqual
+ input: Flatten
+ config:
+ elements:
+ - {transaction_id: "T0012", product_name: "Headphones",
category: "Electronics", price: 59.99}
+ - {transaction_id: "T0012", product_name: "Headphones",
category: "Electronics", price: 59.99}
+ - {transaction_id: "T5034", product_name: "Leather Jacket",
category: "Apparel", price: 109.99}
+ - {transaction_id: "T5034", product_name: "Leather Jacket",
category: "Apparel", price: 109.99}
diff --git a/sdks/python/apache_beam/yaml/tests/partition.yaml
b/sdks/python/apache_beam/yaml/tests/partition.yaml
new file mode 100644
index 00000000000..6cba67425a2
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/tests/partition.yaml
@@ -0,0 +1,268 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+fixtures:
+ - name: TEMP_DIR
+ type: "tempfile.TemporaryDirectory"
+
+pipelines:
+ # Simple Partition into one group with individual values
+ - pipeline:
+ type: composite
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - produce: 'Apple'
+ - produce: 'Apricot'
+ - produce: 'Blueberry'
+ - produce: 'Date'
+ - produce: 'Fig'
+ - produce: 'Mango'
+ - produce: 'Orange'
+ - produce: 'Strawberry'
+ - type: Partition
+ name: PartitionFruits
+ input: Create
+ config:
+ by: "'fruits'"
+ language: python
+ outputs:
+ - "fruits"
+ - type: AssertEqual
+ input: PartitionFruits.fruits
+ config:
+ elements:
+ - produce: 'Apple'
+ - produce: 'Apricot'
+ - produce: 'Blueberry'
+ - produce: 'Date'
+ - produce: 'Fig'
+ - produce: 'Mango'
+ - produce: 'Orange'
+ - produce: 'Strawberry'
+
+ # Simple Partition into two groups with individual values
+ - pipeline:
+ type: composite
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - produce: 'Apple'
+ - produce: 'Apricot'
+ - produce: 'Blueberry'
+ - produce: 'Date'
+ - produce: 'Fig'
+ - produce: 'Mango'
+ - produce: 'Orange'
+ - produce: 'Strawberry'
+ - type: Partition
+ name: PartitionFruits
+ input: Create
+ config:
+ by: "'vowel_fruits' if produce[0].upper() in 'AEIOU' else
'consonant_fruits'"
+ language: python
+ outputs:
+ - "vowel_fruits"
+ - "consonant_fruits"
+ - type: AssertEqual
+ input: PartitionFruits.vowel_fruits
+ config:
+ elements:
+ - produce: 'Apple'
+ - produce: 'Apricot'
+ - produce: 'Orange'
+ - type: AssertEqual
+ input: PartitionFruits.consonant_fruits
+ config:
+ elements:
+ - produce: 'Blueberry'
+ - produce: 'Date'
+ - produce: 'Fig'
+ - produce: 'Mango'
+ - produce: 'Strawberry'
+
+ # Simple Partition into two groups with lists of values
+ - pipeline:
+ type: composite
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - produce: ['Grapefruit', 'Cantaloupe', 'Fig', 'Raspberry',
'Lemon', 'Peach', 'Watermelon']
+ - produce: ['Strawberry', 'Kiwi', 'Lime', 'Pear', 'Plum',
'Orange', 'Pomegranate', 'Cranberry']
+ - produce: ['Banana']
+ - produce: ['Apricot', 'Blackberry', 'Mango', 'Apple',
'Blueberry']
+ - produce: ['Cherry', 'Grape']
+ - type: Partition
+ name: PartitionFruits
+ input: Create
+ config:
+ by: "'too_much_fruits' if len(produce) > 4 else
'just_right_fruits'"
+ language: python
+ outputs:
+ - "too_much_fruits"
+ - "just_right_fruits"
+ - type: AssertEqual
+ input: PartitionFruits.too_much_fruits
+ config:
+ elements:
+ - produce: ['Grapefruit', 'Cantaloupe', 'Fig', 'Raspberry',
'Lemon', 'Peach', 'Watermelon']
+ - produce: ['Strawberry', 'Kiwi', 'Lime', 'Pear', 'Plum',
'Orange', 'Pomegranate', 'Cranberry']
+ - produce: ['Apricot', 'Blackberry', 'Mango', 'Apple',
'Blueberry']
+ - type: AssertEqual
+ input: PartitionFruits.just_right_fruits
+ config:
+ elements:
+ - produce: ['Banana']
+ - produce: ['Cherry', 'Grape']
+
+ # Sequentially Partition into two groups each with lists of values
+ - pipeline:
+ type: composite
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - produce: ['Grapefruit', 'Cantaloupe', 'Fig', 'Raspberry',
'Lemon', 'Peach', 'Watermelon']
+ - produce: ['Strawberry', 'Kiwi', 'Lime', 'Pear', 'Plum',
'Orange', 'Pomegranate', 'Cranberry']
+ - produce: ['Banana']
+ - produce: ['Apricot', 'Blackberry', 'Mango', 'Apple',
'Blueberry']
+ - produce: ['Cherry', 'Grape']
+ - type: Partition
+ name: PartitionFruitsSize
+ input: Create
+ config:
+ by: "'too_much_fruits' if len(produce) > 4 else
'just_right_fruits'"
+ language: python
+ outputs:
+ - "too_much_fruits"
+ - "just_right_fruits"
+ - type: Partition
+ name: PartitionFruitsYummyness
+ input: PartitionFruitsSize.too_much_fruits
+ config:
+ by:
+ callable: |
+ def partition_by_yummyness(row):
+ produce_list = row.produce
+ if any(fruit[0].upper() in 'AEIOU' for fruit in
produce_list):
+ return 'yummy'
+ else:
+ return 'not_yummy'
+ language: python
+ outputs:
+ - "yummy"
+ - "not_yummy"
+ - type: AssertEqual
+ input: PartitionFruitsYummyness.yummy
+ config:
+ elements:
+ - produce: ['Strawberry', 'Kiwi', 'Lime', 'Pear', 'Plum',
'Orange', 'Pomegranate', 'Cranberry']
+ - produce: ['Apricot', 'Blackberry', 'Mango', 'Apple',
'Blueberry']
+ - type: AssertEqual
+ input: PartitionFruitsYummyness.not_yummy
+ config:
+ elements:
+ - produce: ['Grapefruit', 'Cantaloupe', 'Fig', 'Raspberry',
'Lemon', 'Peach', 'Watermelon']
+
+
+ # Simple Partition into two groups with individual values with unknown output
+ - pipeline:
+ type: composite
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - produce: 'Apple'
+ - produce: 'Apricot'
+ - produce: 'Blueberry'
+ - produce: 'Date'
+ - produce: 'Fig'
+ - produce: 'Mango'
+ - produce: 'Orange'
+ - produce: 'Strawberry'
+ - type: Partition
+ name: PartitionFruits
+ input: Create
+ config:
+ by:
+ callable: |
+ def partition(row):
+ if row.produce[0].upper() in 'AEIOU':
+ return 'vowel_fruits'
+ elif row.produce in ['Date', 'Mango']:
+ return 'yummy_fruits'
+ language: python
+ outputs:
+ - "vowel_fruits"
+ - "yummy_fruits"
+ unknown_output: "unknown_fruits"
+ - type: AssertEqual
+ input: PartitionFruits.vowel_fruits
+ config:
+ elements:
+ - produce: 'Apple'
+ - produce: 'Apricot'
+ - produce: 'Orange'
+ - type: AssertEqual
+ input: PartitionFruits.yummy_fruits
+ config:
+ elements:
+ - produce: 'Date'
+ - produce: 'Mango'
+ - type: AssertEqual
+ input: PartitionFruits.unknown_fruits
+ config:
+ elements:
+ - produce: 'Blueberry'
+ - produce: 'Fig'
+ - produce: 'Strawberry'
+
+
+ # Simple Partition with error
+ - pipeline:
+ type: composite
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - produce: 'Apple'
+ - produce: 'Apricot'
+ - produce: 'Blueberry'
+ - produce: 'Date'
+ - produce: 'Fig'
+ - produce: 'Mango'
+ - produce: 'Orange'
+ - produce: 'Strawberry'
+ - type: Partition
+ name: PartitionFruits
+ input: Create
+ config:
+ by: "'vowel_fruits' if produce[100] in 'AEIOU' else
'consonant_fruits'"
+ language: python
+ outputs:
+ - "vowel_fruits"
+ - "consonant_fruits"
+ error_handling:
+ output: error_output
+ - type: WriteToJson
+ name: WriteErrorsToJson
+ input: PartitionFruits.error_output
+ config:
+ path: "{TEMP_DIR}/error.json"
diff --git a/sdks/python/apache_beam/yaml/tests/pytransform.yaml
b/sdks/python/apache_beam/yaml/tests/pytransform.yaml
new file mode 100644
index 00000000000..241a16fd7c6
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/tests/pytransform.yaml
@@ -0,0 +1,159 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+fixtures:
+ - name: TEMP_DIR
+ type: "tempfile.TemporaryDirectory"
+
+pipelines:
+ # Simple PyTransform using __constructor__
+ - pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - {price: 1.29, produce: 'Apple'}
+ - {price: 0.29, produce: 'Apricot'}
+ - {price: 0.02, produce: 'Blueberry'}
+ - {price: 0.19, produce: 'Date'}
+ - type: PyTransform
+ config:
+ constructor: __constructor__
+ kwargs:
+ source: |
+ def increase(inc):
+ return beam.Map(lambda x: beam.Row(price=x.price + inc,
produce=x.produce))
+ inc: 0.30
+ - type: AssertEqual
+ config:
+ elements:
+ - {price: 1.59, produce: 'Apple'}
+ - {price: 0.59, produce: 'Apricot'}
+ - {price: 0.32, produce: 'Blueberry'}
+ - {price: 0.49, produce: 'Date'}
+
+ # Simple PyTransform using __constructor__ beam.PTransform
+ - pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - {price: 1.29, produce: 'Apple'}
+ - {price: 0.29, produce: 'Apricot'}
+ - {price: 0.02, produce: 'Blueberry'}
+ - {price: 0.19, produce: 'Date'}
+ - type: PyTransform
+ config:
+ constructor: __constructor__
+ kwargs:
+ source: |
+ class MyPTransform(beam.PTransform):
+ def __init__(self, inc):
+ self._inc = inc
+ def expand(self, pcoll):
+ return pcoll | beam.Map(lambda x: beam.Row(price=x.price +
self._inc, produce=x.produce))
+ inc: 0.30
+ - type: AssertEqual
+ config:
+ elements:
+ - {price: 1.59, produce: 'Apple'}
+ - {price: 0.59, produce: 'Apricot'}
+ - {price: 0.32, produce: 'Blueberry'}
+ - {price: 0.49, produce: 'Date'}
+
+ # Simple PyTransform using __callable__
+ - pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - {price: 1.29, produce: 'Apple'}
+ - {price: 0.29, produce: 'Apricot'}
+ - {price: 0.02, produce: 'Blueberry'}
+ - {price: 0.19, produce: 'Date'}
+ - type: PyTransform
+ config:
+ constructor: __callable__
+ kwargs:
+ source: |
+ def increase(pcoll, inc):
+ return pcoll | beam.Map(lambda x: beam.Row(price=x.price +
inc, produce=x.produce))
+ inc: 0.30
+ - type: AssertEqual
+ config:
+ elements:
+ - {price: 1.59, produce: 'Apple'}
+ - {price: 0.59, produce: 'Apricot'}
+ - {price: 0.32, produce: 'Blueberry'}
+ - {price: 0.49, produce: 'Date'}
+
+ # Create Csv for pytransform read in next pipeline
+ - pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - {price: 1.29, produce: 'Apple'}
+ - {price: 0.29, produce: 'Apricot'}
+ - {price: 0.02, produce: 'Blueberry'}
+ - {price: 0.19, produce: 'Date'}
+ - type: WriteToCsv
+ config:
+ path: "{TEMP_DIR}/out.csv"
+
+ # Simple PyTransform using ReadFromCsv with args
+ - pipeline:
+ type: composite
+ transforms:
+ - type: PyTransform
+ name: PyTransformReadFromCsv
+ input: {}
+ config:
+ constructor: apache_beam.io.ReadFromCsv
+ args: ['{TEMP_DIR}/out.csv*']
+ - type: AssertEqual
+ input: PyTransformReadFromCsv
+ config:
+ elements:
+ - {price: 1.29, produce: 'Apple'}
+ - {price: 0.29, produce: 'Apricot'}
+ - {price: 0.02, produce: 'Blueberry'}
+ - {price: 0.19, produce: 'Date'}
+
+ # Simple PyTransform using ReadFromCsv with kwargs
+ - pipeline:
+ type: composite
+ transforms:
+ - type: PyTransform
+ name: PyTransformReadFromCsv
+ input: {}
+ config:
+ constructor: apache_beam.io.ReadFromCsv
+ kwargs:
+ path: '{TEMP_DIR}/out.csv*'
+ - type: AssertEqual
+ input: PyTransformReadFromCsv
+ config:
+ elements:
+ - {price: 1.29, produce: 'Apple'}
+ - {price: 0.29, produce: 'Apricot'}
+ - {price: 0.02, produce: 'Blueberry'}
+ - {price: 0.19, produce: 'Date'}
diff --git a/sdks/python/apache_beam/yaml/yaml_mapping.py
b/sdks/python/apache_beam/yaml/yaml_mapping.py
index 5faed9af454..9985e9756fd 100644
--- a/sdks/python/apache_beam/yaml/yaml_mapping.py
+++ b/sdks/python/apache_beam/yaml/yaml_mapping.py
@@ -564,7 +564,10 @@ class _Explode(beam.PTransform):
cross_product = True
self._fields = fields
self._cross_product = cross_product
- # TODO(yaml): Support standard error handling argument.
+ # TODO(yaml):
+ # 1. Support standard error handling argument.
+ # 2. Supposedly error_handling parameter is not an accepted parameter when
+ # executing. Needs further investigation.
self._exception_handling_args = exception_handling_args(error_handling)
@maybe_with_exception_handling
@@ -750,7 +753,7 @@ def _Partition(
outputs: list[str],
unknown_output: Optional[str] = None,
error_handling: Optional[Mapping[str, Any]] = None,
- language: Optional[str] = 'generic'):
+ language: str = 'generic'):
"""Splits an input into several distinct outputs.
Each input element will go to a distinct output based on the field or
@@ -767,7 +770,7 @@ def _Partition(
parameter.
error_handling: (Optional) Whether and how to handle errors during
partitioning.
- language: (Optional) The language of the `by` expression.
+ language: The language of the `by` expression.
"""
split_fn = _as_callable_for_pcoll(pcoll, by, 'by', language)
try: