This is an automated email from the ASF dual-hosted git repository. robertwb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new d799365dbaf [Yaml] Unit Tests for SafeLineLoader and LightweightScope (yaml/yaml_transform.py) (#27086) d799365dbaf is described below commit d799365dbafeddc5a87722f41960c5e683ae9922 Author: bzablocki <bzablo...@google.com> AuthorDate: Wed Jul 12 00:40:20 2023 +0200 [Yaml] Unit Tests for SafeLineLoader and LightweightScope (yaml/yaml_transform.py) (#27086) --- .../python/apache_beam/yaml/yaml_transform_test.py | 126 +++++++++++++++++++++ 1 file changed, 126 insertions(+) diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py b/sdks/python/apache_beam/yaml/yaml_transform_test.py index 4ccc50da757..ec5b122741a 100644 --- a/sdks/python/apache_beam/yaml/yaml_transform_test.py +++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py @@ -22,14 +22,140 @@ import os import tempfile import unittest +import yaml + import apache_beam as beam from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to from apache_beam.yaml import yaml_provider +from apache_beam.yaml import yaml_transform +from apache_beam.yaml.yaml_transform import LightweightScope +from apache_beam.yaml.yaml_transform import SafeLineLoader from apache_beam.yaml.yaml_transform import YamlTransform class YamlTransformTest(unittest.TestCase): + def test_only_element(self): + self.assertEqual(yaml_transform.only_element((1, )), 1) + + +class SafeLineLoaderTest(unittest.TestCase): + def test_get_line(self): + pipeline_yaml = ''' + type: composite + input: + elements: input + transforms: + - type: PyMap + name: Square + input: elements + fn: "lambda x: x * x" + - type: PyMap + name: Cube + input: elements + fn: "lambda x: x * x * x" + output: + Flatten + ''' + spec = yaml.load(pipeline_yaml, Loader=SafeLineLoader) + self.assertEqual(SafeLineLoader.get_line(spec['type']), 2) + self.assertEqual(SafeLineLoader.get_line(spec['input']), 4) + self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]), 6) + self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]['type']), 6) + self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]['name']), 7) + self.assertEqual(SafeLineLoader.get_line(spec['transforms'][1]), 10) + self.assertEqual(SafeLineLoader.get_line(spec['output']), 15) + self.assertEqual(SafeLineLoader.get_line(spec['transforms']), "unknown") + + def test_strip_metadata(self): + spec_yaml = ''' + transforms: + - type: PyMap + name: Square + ''' + spec = yaml.load(spec_yaml, Loader=SafeLineLoader) + stripped = SafeLineLoader.strip_metadata(spec['transforms']) + + self.assertFalse(hasattr(stripped[0], '__line__')) + self.assertFalse(hasattr(stripped[0], '__uuid__')) + + def test_strip_metadata_nothing_to_strip(self): + spec_yaml = 'prop: 123' + spec = yaml.load(spec_yaml, Loader=SafeLineLoader) + stripped = SafeLineLoader.strip_metadata(spec['prop']) + + self.assertFalse(hasattr(stripped, '__line__')) + self.assertFalse(hasattr(stripped, '__uuid__')) + + +class LightweightScopeTest(unittest.TestCase): + @staticmethod + def get_spec(): + pipeline_yaml = ''' + - type: PyMap + name: Square + input: elements + fn: "lambda x: x * x" + - type: PyMap + name: PyMap + input: elements + fn: "lambda x: x * x * x" + - type: Filter + name: FilterOutBigNumbers + input: PyMap + keep: "lambda x: x<100" + ''' + return yaml.load(pipeline_yaml, Loader=SafeLineLoader) + + def test_init(self): + spec = self.get_spec() + scope = LightweightScope(spec) + self.assertEqual(len(scope._transforms_by_uuid), 3) + self.assertCountEqual( + list(scope._uuid_by_name.keys()), + ["PyMap", "Square", "Filter", "FilterOutBigNumbers"]) + + def test_get_transform_id_and_output_name(self): + spec = self.get_spec() + scope = LightweightScope(spec) + transform_id, output = scope.get_transform_id_and_output_name("Square") + self.assertEqual(transform_id, spec[0]['__uuid__']) + self.assertEqual(output, None) + + def test_get_transform_id_and_output_name_with_dot(self): + spec = self.get_spec() + scope = LightweightScope(spec) + transform_id, output = \ + scope.get_transform_id_and_output_name("Square.OutputName") + self.assertEqual(transform_id, spec[0]['__uuid__']) + self.assertEqual(output, "OutputName") + + def test_get_transform_id_by_uuid(self): + spec = self.get_spec() + scope = LightweightScope(spec) + transform_id = scope.get_transform_id(spec[0]['__uuid__']) + self.assertEqual(transform_id, spec[0]['__uuid__']) + + def test_get_transform_id_by_unique_name(self): + spec = self.get_spec() + scope = LightweightScope(spec) + transform_id = scope.get_transform_id("Square") + self.assertEqual(transform_id, spec[0]['__uuid__']) + + def test_get_transform_id_by_ambiguous_name(self): + spec = self.get_spec() + scope = LightweightScope(spec) + with self.assertRaisesRegex(ValueError, r'Ambiguous.*PyMap'): + scope.get_transform_id(scope.get_transform_id(spec[1]['name'])) + + def test_get_transform_id_by_unknown_name(self): + spec = self.get_spec() + scope = LightweightScope(spec) + with self.assertRaisesRegex(ValueError, r'Unknown.*NotExistingTransform'): + scope.get_transform_id("NotExistingTransform") + + +class YamlTransformE2ETest(unittest.TestCase): def test_composite(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle')) as p: