This is an automated email from the ASF dual-hosted git repository. robertwb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 44b639fdd7e [yaml] yaml_transform.py unit tests (#27338) 44b639fdd7e is described below commit 44b639fdd7e8cdc7b29bef8bbe99545a0f3b3dc8 Author: bzablocki <bzablo...@google.com> AuthorDate: Fri Aug 25 01:09:39 2023 +0200 [yaml] yaml_transform.py unit tests (#27338) * SafeLineLoader unit tests * LightweightScope unit tests * Scope Unit Tests * rename yaml_transform_ut_test.py to yaml_transform_unit_test.py --- sdks/python/apache_beam/yaml/yaml_transform.py | 2 +- .../apache_beam/yaml/yaml_transform_scope_test.py | 204 ++++++++++++++++++++ .../python/apache_beam/yaml/yaml_transform_test.py | 133 +------------ .../apache_beam/yaml/yaml_transform_unit_test.py | 210 +++++++++++++++++++++ 4 files changed, 416 insertions(+), 133 deletions(-) diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py b/sdks/python/apache_beam/yaml/yaml_transform.py index f2e73029811..e2b85d8522e 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform.py +++ b/sdks/python/apache_beam/yaml/yaml_transform.py @@ -346,7 +346,7 @@ def expand_leaf_transform(spec, scope): # TODO: Handle (or at least reject) nested case. return outputs elif isinstance(outputs, (tuple, list)): - return {'out{ix}': pcoll for (ix, pcoll) in enumerate(outputs)} + return {f'out{ix}': pcoll for (ix, pcoll) in enumerate(outputs)} elif isinstance(outputs, beam.PCollection): return {'out': outputs} else: diff --git a/sdks/python/apache_beam/yaml/yaml_transform_scope_test.py b/sdks/python/apache_beam/yaml/yaml_transform_scope_test.py new file mode 100644 index 00000000000..ead5d5d66d2 --- /dev/null +++ b/sdks/python/apache_beam/yaml/yaml_transform_scope_test.py @@ -0,0 +1,204 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import unittest + +import yaml + +import apache_beam as beam +from apache_beam.yaml import yaml_provider +from apache_beam.yaml.yaml_transform import LightweightScope +from apache_beam.yaml.yaml_transform import SafeLineLoader +from apache_beam.yaml.yaml_transform import Scope + + +class ScopeTest(unittest.TestCase): + def get_scope_by_spec(self, p, spec): + spec = yaml.load(spec, Loader=SafeLineLoader) + + scope = Scope( + beam.pvalue.PBegin(p), {}, + spec['transforms'], + yaml_provider.standard_providers(), {}) + return scope, spec + + def test_get_pcollection_input(self): + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + elements = p | beam.Create(range(3)) + scope = Scope( + p, {'input': elements}, + transforms=[], + providers=yaml_provider.standard_providers(), + input_providers={}) + + result = scope.get_pcollection('input') + self.assertEqual("PCollection[Create/Map(decode).None]", str(result)) + + def test_get_pcollection_output(self): + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + spec = ''' + transforms: + - type: Create + config: + elements: [0, 1, 3, 4] + - type: PyMap + name: Square + input: Create + config: + fn: "lambda x: x*x" + ''' + + scope, spec = self.get_scope_by_spec(p, spec) + + self.assertEqual( + "PCollection[Create/Map(decode).None]", + str(scope.get_pcollection("Create"))) + + self.assertEqual( + "PCollection[Square.None]", str(scope.get_pcollection("Square"))) + + self.assertEqual( + "PCollection[Square.None]", str(scope.get_pcollection("PyMap"))) + + self.assertTrue( + scope.get_pcollection("Square") == scope.get_pcollection("PyMap")) + + def test_create_ptransform(self): + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + spec = ''' + transforms: + - type: PyMap + config: + fn: "lambda x: x*x" + ''' + scope, spec = self.get_scope_by_spec(p, spec) + + result = scope.create_ptransform(spec['transforms'][0], []) + self.assertIsInstance(result, beam.transforms.ParDo) + self.assertEqual(result.label, 'Map(lambda x: x*x)') + + result_annotations = {**result.annotations()} + target_annotations = { + 'yaml_type': 'PyMap', + 'yaml_args': '{"fn": "lambda x: x*x"}', + 'yaml_provider': '{"type": "InlineProvider"}' + } + + # Check if target_annotations is a subset of result_annotations + self.assertDictEqual( + result_annotations, { + **result_annotations, **target_annotations + }) + + def test_create_ptransform_with_inputs(self): + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + spec = ''' + transforms: + - type: PyMap + config: + fn: "lambda x: x*x" + ''' + scope, spec = self.get_scope_by_spec(p, spec) + + result = scope.create_ptransform(spec['transforms'][0], []) + self.assertIsInstance(result, beam.transforms.ParDo) + self.assertEqual(result.label, 'Map(lambda x: x*x)') + + result_annotations = {**result.annotations()} + target_annotations = { + 'yaml_type': 'PyMap', + 'yaml_args': '{"fn": "lambda x: x*x"}', + 'yaml_provider': '{"type": "InlineProvider"}' + } + self.assertDictEqual(result_annotations, target_annotations) + + +class LightweightScopeTest(unittest.TestCase): + @staticmethod + def get_spec(): + pipeline_yaml = ''' + - type: PyMap + name: Square + input: elements + fn: "lambda x: x * x" + - type: PyMap + name: PyMap + input: elements + fn: "lambda x: x * x * x" + - type: Filter + name: FilterOutBigNumbers + input: PyMap + keep: "lambda x: x<100" + ''' + return yaml.load(pipeline_yaml, Loader=SafeLineLoader) + + def test_init(self): + spec = self.get_spec() + scope = LightweightScope(spec) + self.assertEqual(len(scope._transforms_by_uuid), 3) + self.assertCountEqual( + list(scope._uuid_by_name.keys()), + ["PyMap", "Square", "Filter", "FilterOutBigNumbers"]) + + def test_get_transform_id_and_output_name(self): + spec = self.get_spec() + scope = LightweightScope(spec) + transform_id, output = scope.get_transform_id_and_output_name("Square") + self.assertEqual(transform_id, spec[0]['__uuid__']) + self.assertEqual(output, None) + + def test_get_transform_id_and_output_name_with_dot(self): + spec = self.get_spec() + scope = LightweightScope(spec) + transform_id, output = \ + scope.get_transform_id_and_output_name("Square.OutputName") + self.assertEqual(transform_id, spec[0]['__uuid__']) + self.assertEqual(output, "OutputName") + + def test_get_transform_id_by_uuid(self): + spec = self.get_spec() + scope = LightweightScope(spec) + transform_id = scope.get_transform_id(spec[0]['__uuid__']) + self.assertEqual(spec[0]['__uuid__'], transform_id) + + def test_get_transform_id_by_unique_name(self): + spec = self.get_spec() + scope = LightweightScope(spec) + transform_id = scope.get_transform_id("Square") + self.assertEqual(transform_id, spec[0]['__uuid__']) + + def test_get_transform_id_by_ambiguous_name(self): + spec = self.get_spec() + scope = LightweightScope(spec) + with self.assertRaisesRegex(ValueError, r'Ambiguous.*PyMap'): + scope.get_transform_id(scope.get_transform_id(spec[1]['name'])) + + def test_get_transform_id_by_unknown_name(self): + spec = self.get_spec() + scope = LightweightScope(spec) + with self.assertRaisesRegex(ValueError, r'Unknown.*NotExistingTransform'): + scope.get_transform_id("NotExistingTransform") + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py b/sdks/python/apache_beam/yaml/yaml_transform_test.py index eec564b6a10..9a540e3551f 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform_test.py +++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py @@ -22,144 +22,13 @@ import os import tempfile import unittest -import yaml - import apache_beam as beam from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to from apache_beam.yaml import yaml_provider -from apache_beam.yaml import yaml_transform -from apache_beam.yaml.yaml_transform import LightweightScope -from apache_beam.yaml.yaml_transform import SafeLineLoader from apache_beam.yaml.yaml_transform import YamlTransform -class YamlTransformTest(unittest.TestCase): - def test_only_element(self): - self.assertEqual(yaml_transform.only_element((1, )), 1) - - -class SafeLineLoaderTest(unittest.TestCase): - def test_get_line(self): - pipeline_yaml = ''' - type: composite - input: - elements: input - transforms: - - type: PyMap - name: Square - input: elements - config: - fn: "lambda x: x * x" - - type: PyMap - name: Cube - input: elements - config: - fn: "lambda x: x * x * x" - output: - Flatten - ''' - spec = yaml.load(pipeline_yaml, Loader=SafeLineLoader) - self.assertEqual(SafeLineLoader.get_line(spec['type']), 2) - self.assertEqual(SafeLineLoader.get_line(spec['input']), 4) - self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]), 6) - self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]['type']), 6) - self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]['name']), 7) - self.assertEqual(SafeLineLoader.get_line(spec['transforms'][1]), 11) - self.assertEqual(SafeLineLoader.get_line(spec['output']), 17) - self.assertEqual(SafeLineLoader.get_line(spec['transforms']), "unknown") - - def test_strip_metadata(self): - spec_yaml = ''' - transforms: - - type: PyMap - name: Square - ''' - spec = yaml.load(spec_yaml, Loader=SafeLineLoader) - stripped = SafeLineLoader.strip_metadata(spec['transforms']) - - self.assertFalse(hasattr(stripped[0], '__line__')) - self.assertFalse(hasattr(stripped[0], '__uuid__')) - - def test_strip_metadata_nothing_to_strip(self): - spec_yaml = 'prop: 123' - spec = yaml.load(spec_yaml, Loader=SafeLineLoader) - stripped = SafeLineLoader.strip_metadata(spec['prop']) - - self.assertFalse(hasattr(stripped, '__line__')) - self.assertFalse(hasattr(stripped, '__uuid__')) - - -class LightweightScopeTest(unittest.TestCase): - @staticmethod - def get_spec(): - pipeline_yaml = ''' - - type: PyMap - name: Square - input: elements - config: - fn: "lambda x: x * x" - - type: PyMap - name: PyMap - input: elements - config: - fn: "lambda x: x * x * x" - - type: Filter - name: FilterOutBigNumbers - input: PyMap - config: - keep: "lambda x: x<100" - ''' - return yaml.load(pipeline_yaml, Loader=SafeLineLoader) - - def test_init(self): - spec = self.get_spec() - scope = LightweightScope(spec) - self.assertEqual(len(scope._transforms_by_uuid), 3) - self.assertCountEqual( - list(scope._uuid_by_name.keys()), - ["PyMap", "Square", "Filter", "FilterOutBigNumbers"]) - - def test_get_transform_id_and_output_name(self): - spec = self.get_spec() - scope = LightweightScope(spec) - transform_id, output = scope.get_transform_id_and_output_name("Square") - self.assertEqual(transform_id, spec[0]['__uuid__']) - self.assertEqual(output, None) - - def test_get_transform_id_and_output_name_with_dot(self): - spec = self.get_spec() - scope = LightweightScope(spec) - transform_id, output = \ - scope.get_transform_id_and_output_name("Square.OutputName") - self.assertEqual(transform_id, spec[0]['__uuid__']) - self.assertEqual(output, "OutputName") - - def test_get_transform_id_by_uuid(self): - spec = self.get_spec() - scope = LightweightScope(spec) - transform_id = scope.get_transform_id(spec[0]['__uuid__']) - self.assertEqual(transform_id, spec[0]['__uuid__']) - - def test_get_transform_id_by_unique_name(self): - spec = self.get_spec() - scope = LightweightScope(spec) - transform_id = scope.get_transform_id("Square") - self.assertEqual(transform_id, spec[0]['__uuid__']) - - def test_get_transform_id_by_ambiguous_name(self): - spec = self.get_spec() - scope = LightweightScope(spec) - with self.assertRaisesRegex(ValueError, r'Ambiguous.*PyMap'): - scope.get_transform_id(scope.get_transform_id(spec[1]['name'])) - - def test_get_transform_id_by_unknown_name(self): - spec = self.get_spec() - scope = LightweightScope(spec) - with self.assertRaisesRegex(ValueError, r'Unknown.*NotExistingTransform'): - scope.get_transform_id("NotExistingTransform") - - class YamlTransformE2ETest(unittest.TestCase): def test_composite(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( @@ -338,7 +207,7 @@ class YamlTransformE2ETest(unittest.TestCase): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle')) as p: # pylint: disable=expression-not-assigned - with self.assertRaises(ValueError): + with self.assertRaisesRegex(ValueError, r'Ambiguous.*'): p | YamlTransform( ''' type: composite diff --git a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py new file mode 100644 index 00000000000..d903d85cd21 --- /dev/null +++ b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py @@ -0,0 +1,210 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import logging +import unittest + +import yaml + +import apache_beam as beam +from apache_beam.yaml import yaml_provider +from apache_beam.yaml import yaml_transform +from apache_beam.yaml.yaml_transform import SafeLineLoader +from apache_beam.yaml.yaml_transform import Scope +from apache_beam.yaml.yaml_transform import expand_composite_transform +from apache_beam.yaml.yaml_transform import pipeline_as_composite + + +class YamlTransformTest(unittest.TestCase): + def test_only_element(self): + self.assertEqual(yaml_transform.only_element((1, )), 1) + + +class SafeLineLoaderTest(unittest.TestCase): + def test_get_line(self): + pipeline_yaml = ''' + type: composite + input: + elements: input + transforms: + - type: PyMap + name: Square + input: elements + fn: "lambda x: x * x" + - type: PyMap + name: Cube + input: elements + fn: "lambda x: x * x * x" + output: + Flatten + ''' + spec = yaml.load(pipeline_yaml, Loader=SafeLineLoader) + self.assertEqual(SafeLineLoader.get_line(spec['type']), 2) + self.assertEqual(SafeLineLoader.get_line(spec['input']), 4) + self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]), 6) + self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]['type']), 6) + self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]['name']), 7) + self.assertEqual(SafeLineLoader.get_line(spec['transforms'][1]), 10) + self.assertEqual(SafeLineLoader.get_line(spec['output']), 15) + self.assertEqual(SafeLineLoader.get_line(spec['transforms']), "unknown") + + def test_strip_metadata(self): + spec_yaml = ''' + transforms: + - type: PyMap + name: Square + ''' + spec = yaml.load(spec_yaml, Loader=SafeLineLoader) + stripped = SafeLineLoader.strip_metadata(spec['transforms']) + + self.assertFalse(hasattr(stripped[0], '__line__')) + self.assertFalse(hasattr(stripped[0], '__uuid__')) + + def test_strip_metadata_nothing_to_strip(self): + spec_yaml = 'prop: 123' + spec = yaml.load(spec_yaml, Loader=SafeLineLoader) + stripped = SafeLineLoader.strip_metadata(spec['prop']) + + self.assertFalse(hasattr(stripped, '__line__')) + self.assertFalse(hasattr(stripped, '__uuid__')) + + +def new_pipeline(): + return beam.Pipeline( + options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) + + +class MainTest(unittest.TestCase): + def get_scope_by_spec(self, p, spec, inputs=None): + if inputs is None: + inputs = {} + spec = yaml.load(spec, Loader=SafeLineLoader) + + scope = Scope( + beam.pvalue.PBegin(p), + inputs, + spec['transforms'], + yaml_provider.standard_providers(), {}) + return scope, spec + + def test_pipeline_as_composite_with_type_transforms(self): + spec = ''' + type: composite + transforms: + - type: Create + config: + elements: [0,1,2] + - type: PyMap + config: + fn: 'lambda x: x*x' + ''' + spec = yaml.load(spec, Loader=SafeLineLoader) + result = pipeline_as_composite(spec) + + self.assertEqual(result['type'], 'composite') + self.assertEqual(result['name'], None) + + def test_pipeline_as_composite_with_transforms(self): + spec = ''' + transforms: + - type: Create + config: + elements: [0,1,2] + - type: PyMap + config: + fn: 'lambda x: x*x' + ''' + spec = yaml.load(spec, Loader=SafeLineLoader) + result = pipeline_as_composite(spec) + + self.assertEqual(result['type'], 'composite') + self.assertEqual(result['name'], None) + + def test_pipeline_as_composite_list(self): + spec = ''' + - type: Create + config: + elements: [0,1,2] + - type: PyMap + config: + fn: 'lambda x: x*x' + ''' + spec = yaml.load(spec, Loader=SafeLineLoader) + result = pipeline_as_composite(spec) + + self.assertEqual(result['type'], 'composite') + self.assertEqual(result['name'], None) + self.assertEqual(result['transforms'], spec) + self.assertTrue('__line__' in result) + self.assertTrue('__uuid__' in result) + + def test_expand_composite_transform_with_name(self): + with new_pipeline() as p: + spec = ''' + type: composite + name: Custom + transforms: + - type: Create + config: + elements: [0,1,2] + output: + Create + ''' + scope, spec = self.get_scope_by_spec(p, spec) + result = expand_composite_transform(spec, scope) + self.assertRegex( + str(result['output']), r"PCollection.*Custom/Create/Map.*") + + def test_expand_composite_transform_with_name_input(self): + with new_pipeline() as p: + spec = ''' + type: composite + input: elements + transforms: + - type: PyMap + input: input + config: + fn: 'lambda x: x*x' + output: + PyMap + ''' + elements = p | beam.Create(range(3)) + scope, spec = self.get_scope_by_spec(p, spec, + inputs={'elements': elements}) + result = expand_composite_transform(spec, scope) + + self.assertRegex(str(result['output']), r"PCollection.*Composite/Map.*") + + def test_expand_composite_transform_root(self): + with new_pipeline() as p: + spec = ''' + type: composite + transforms: + - type: Create + config: + elements: [0,1,2] + output: + Create + ''' + scope, spec = self.get_scope_by_spec(p, spec) + result = expand_composite_transform(spec, scope) + self.assertRegex(str(result['output']), r"PCollection.*Create/Map.*") + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main()