This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 78eb4a2a2d7 fix: Enable `create_test` to correctly parse and apply
external providers defined in YAML pipeline specifications. (#37216)
78eb4a2a2d7 is described below
commit 78eb4a2a2d7153e24b86cfea418b9b317ebebe04
Author: liferoad <[email protected]>
AuthorDate: Mon Jan 5 08:17:10 2026 -0500
fix: Enable `create_test` to correctly parse and apply external providers
defined in YAML pipeline specifications. (#37216)
---
sdks/python/apache_beam/yaml/yaml_testing.py | 21 +++++++++---
sdks/python/apache_beam/yaml/yaml_testing_test.py | 39 +++++++++++++++++++++++
2 files changed, 55 insertions(+), 5 deletions(-)
diff --git a/sdks/python/apache_beam/yaml/yaml_testing.py
b/sdks/python/apache_beam/yaml/yaml_testing.py
index ad31afa927e..ead3ab9de31 100644
--- a/sdks/python/apache_beam/yaml/yaml_testing.py
+++ b/sdks/python/apache_beam/yaml/yaml_testing.py
@@ -411,6 +411,13 @@ def create_test(
**yaml_transform.SafeLineLoader.strip_metadata(
pipeline_spec.get('options', {})))
+ providers = yaml_provider.merge_providers(
+ yaml_provider.parse_providers('', pipeline_spec.get('providers', [])),
+ {
+ 'AssertEqualAndRecord': yaml_provider.as_provider_list(
+ 'AssertEqualAndRecord', AssertEqualAndRecord)
+ })
+
def get_name(transform):
if 'name' in transform:
return str(transform['name'])
@@ -428,7 +435,8 @@ def create_test(
mock_outputs = [{
'name': get_name(t),
'elements': [
- _try_row_as_dict(row) for row in _first_n(t, options, max_num_inputs)
+ _try_row_as_dict(row)
+ for row in _first_n(t, options, max_num_inputs, providers)
],
} for t in input_transforms]
@@ -504,15 +512,18 @@ class RecordElements(beam.PTransform):
return pcoll | beam.Map(record)
-def _first_n(transform_spec, options, n):
+def _first_n(transform_spec, options, n, providers=None):
recorder = RecordElements(n)
+ if providers is None:
+ providers = {
+ 'AssertEqualAndRecord': yaml_provider.as_provider_list(
+ 'AssertEqualAndRecord', AssertEqualAndRecord)
+ }
try:
with beam.Pipeline(options=options) as p:
_ = (
p
- | yaml_transform.YamlTransform(
- transform_spec,
- providers={'AssertEqualAndRecord': AssertEqualAndRecord})
+ | yaml_transform.YamlTransform(transform_spec, providers=providers)
| recorder)
except _DoneException:
pass
diff --git a/sdks/python/apache_beam/yaml/yaml_testing_test.py
b/sdks/python/apache_beam/yaml/yaml_testing_test.py
index 9bb0e64b6db..70e9246e4d3 100644
--- a/sdks/python/apache_beam/yaml/yaml_testing_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_testing_test.py
@@ -356,6 +356,45 @@ class YamlTestingTest(unittest.TestCase):
}]
})
+ def test_create_with_external_providers(self):
+ """Test that create_test works with external providers defined in the
+ pipeline spec.
+
+ This test validates the fix for issue #37136 where external providers
+ defined in YAML files were not recognized when running tests.
+ """
+ pipeline = '''
+ pipeline:
+ type: chain
+ transforms:
+ - type: Create
+ config:
+ elements:
+ - {a: 1, b: 2}
+ - {a: 2, b: 3}
+ - {a: 3, b: 4}
+ - {a: 4, b: 5}
+ - {a: 5, b: 6}
+ - type: MyCustomTransform
+ - type: LogForTesting
+ providers:
+ - type: yaml
+ transforms:
+ MyCustomTransform:
+ body:
+ type: MapToFields
+ config:
+ language: python
+ fields:
+ sum_ab: a + b
+ '''
+ test_spec = yaml_testing.create_test(
+ pipeline, max_num_inputs=10, min_num_outputs=3)
+
+ self.assertEqual(len(test_spec['expected_inputs']), 1)
+ self.assertGreaterEqual(len(test_spec['expected_inputs'][0]['elements']),
3)
+ yaml_testing.run_test(pipeline, test_spec)
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)