This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 44b639fdd7e [yaml] yaml_transform.py unit tests  (#27338)
44b639fdd7e is described below

commit 44b639fdd7e8cdc7b29bef8bbe99545a0f3b3dc8
Author: bzablocki <bzablo...@google.com>
AuthorDate: Fri Aug 25 01:09:39 2023 +0200

    [yaml] yaml_transform.py unit tests  (#27338)
    
    * SafeLineLoader unit tests
    * LightweightScope unit tests
    * Scope Unit Tests
    * rename yaml_transform_ut_test.py to yaml_transform_unit_test.py
---
 sdks/python/apache_beam/yaml/yaml_transform.py     |   2 +-
 .../apache_beam/yaml/yaml_transform_scope_test.py  | 204 ++++++++++++++++++++
 .../python/apache_beam/yaml/yaml_transform_test.py | 133 +------------
 .../apache_beam/yaml/yaml_transform_unit_test.py   | 210 +++++++++++++++++++++
 4 files changed, 416 insertions(+), 133 deletions(-)

diff --git a/sdks/python/apache_beam/yaml/yaml_transform.py 
b/sdks/python/apache_beam/yaml/yaml_transform.py
index f2e73029811..e2b85d8522e 100644
--- a/sdks/python/apache_beam/yaml/yaml_transform.py
+++ b/sdks/python/apache_beam/yaml/yaml_transform.py
@@ -346,7 +346,7 @@ def expand_leaf_transform(spec, scope):
     # TODO: Handle (or at least reject) nested case.
     return outputs
   elif isinstance(outputs, (tuple, list)):
-    return {'out{ix}': pcoll for (ix, pcoll) in enumerate(outputs)}
+    return {f'out{ix}': pcoll for (ix, pcoll) in enumerate(outputs)}
   elif isinstance(outputs, beam.PCollection):
     return {'out': outputs}
   else:
diff --git a/sdks/python/apache_beam/yaml/yaml_transform_scope_test.py 
b/sdks/python/apache_beam/yaml/yaml_transform_scope_test.py
new file mode 100644
index 00000000000..ead5d5d66d2
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/yaml_transform_scope_test.py
@@ -0,0 +1,204 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import unittest
+
+import yaml
+
+import apache_beam as beam
+from apache_beam.yaml import yaml_provider
+from apache_beam.yaml.yaml_transform import LightweightScope
+from apache_beam.yaml.yaml_transform import SafeLineLoader
+from apache_beam.yaml.yaml_transform import Scope
+
+
+class ScopeTest(unittest.TestCase):
+  def get_scope_by_spec(self, p, spec):
+    spec = yaml.load(spec, Loader=SafeLineLoader)
+
+    scope = Scope(
+        beam.pvalue.PBegin(p), {},
+        spec['transforms'],
+        yaml_provider.standard_providers(), {})
+    return scope, spec
+
+  def test_get_pcollection_input(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      elements = p | beam.Create(range(3))
+      scope = Scope(
+          p, {'input': elements},
+          transforms=[],
+          providers=yaml_provider.standard_providers(),
+          input_providers={})
+
+      result = scope.get_pcollection('input')
+      self.assertEqual("PCollection[Create/Map(decode).None]", str(result))
+
+  def test_get_pcollection_output(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      spec = '''
+        transforms:
+          - type: Create
+            config:
+              elements: [0, 1, 3, 4]
+          - type: PyMap
+            name: Square
+            input: Create
+            config: 
+              fn: "lambda x: x*x"
+        '''
+
+    scope, spec = self.get_scope_by_spec(p, spec)
+
+    self.assertEqual(
+        "PCollection[Create/Map(decode).None]",
+        str(scope.get_pcollection("Create")))
+
+    self.assertEqual(
+        "PCollection[Square.None]", str(scope.get_pcollection("Square")))
+
+    self.assertEqual(
+        "PCollection[Square.None]", str(scope.get_pcollection("PyMap")))
+
+    self.assertTrue(
+        scope.get_pcollection("Square") == scope.get_pcollection("PyMap"))
+
+  def test_create_ptransform(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      spec = '''
+        transforms:
+          - type: PyMap
+            config:
+              fn: "lambda x: x*x"
+        '''
+      scope, spec = self.get_scope_by_spec(p, spec)
+
+      result = scope.create_ptransform(spec['transforms'][0], [])
+      self.assertIsInstance(result, beam.transforms.ParDo)
+      self.assertEqual(result.label, 'Map(lambda x: x*x)')
+
+      result_annotations = {**result.annotations()}
+      target_annotations = {
+          'yaml_type': 'PyMap',
+          'yaml_args': '{"fn": "lambda x: x*x"}',
+          'yaml_provider': '{"type": "InlineProvider"}'
+      }
+
+      # Check if target_annotations is a subset of result_annotations
+      self.assertDictEqual(
+          result_annotations, {
+              **result_annotations, **target_annotations
+          })
+
+  def test_create_ptransform_with_inputs(self):
+    with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
+        pickle_library='cloudpickle')) as p:
+      spec = '''
+        transforms:
+          - type: PyMap
+            config:
+              fn: "lambda x: x*x"
+        '''
+      scope, spec = self.get_scope_by_spec(p, spec)
+
+      result = scope.create_ptransform(spec['transforms'][0], [])
+      self.assertIsInstance(result, beam.transforms.ParDo)
+      self.assertEqual(result.label, 'Map(lambda x: x*x)')
+
+      result_annotations = {**result.annotations()}
+      target_annotations = {
+          'yaml_type': 'PyMap',
+          'yaml_args': '{"fn": "lambda x: x*x"}',
+          'yaml_provider': '{"type": "InlineProvider"}'
+      }
+      self.assertDictEqual(result_annotations, target_annotations)
+
+
+class LightweightScopeTest(unittest.TestCase):
+  @staticmethod
+  def get_spec():
+    pipeline_yaml = '''
+          - type: PyMap
+            name: Square
+            input: elements
+            fn: "lambda x: x * x"
+          - type: PyMap
+            name: PyMap
+            input: elements
+            fn: "lambda x: x * x * x"
+          - type: Filter
+            name: FilterOutBigNumbers
+            input: PyMap 
+            keep: "lambda x: x<100"
+          '''
+    return yaml.load(pipeline_yaml, Loader=SafeLineLoader)
+
+  def test_init(self):
+    spec = self.get_spec()
+    scope = LightweightScope(spec)
+    self.assertEqual(len(scope._transforms_by_uuid), 3)
+    self.assertCountEqual(
+        list(scope._uuid_by_name.keys()),
+        ["PyMap", "Square", "Filter", "FilterOutBigNumbers"])
+
+  def test_get_transform_id_and_output_name(self):
+    spec = self.get_spec()
+    scope = LightweightScope(spec)
+    transform_id, output = scope.get_transform_id_and_output_name("Square")
+    self.assertEqual(transform_id, spec[0]['__uuid__'])
+    self.assertEqual(output, None)
+
+  def test_get_transform_id_and_output_name_with_dot(self):
+    spec = self.get_spec()
+    scope = LightweightScope(spec)
+    transform_id, output = \
+      scope.get_transform_id_and_output_name("Square.OutputName")
+    self.assertEqual(transform_id, spec[0]['__uuid__'])
+    self.assertEqual(output, "OutputName")
+
+  def test_get_transform_id_by_uuid(self):
+    spec = self.get_spec()
+    scope = LightweightScope(spec)
+    transform_id = scope.get_transform_id(spec[0]['__uuid__'])
+    self.assertEqual(spec[0]['__uuid__'], transform_id)
+
+  def test_get_transform_id_by_unique_name(self):
+    spec = self.get_spec()
+    scope = LightweightScope(spec)
+    transform_id = scope.get_transform_id("Square")
+    self.assertEqual(transform_id, spec[0]['__uuid__'])
+
+  def test_get_transform_id_by_ambiguous_name(self):
+    spec = self.get_spec()
+    scope = LightweightScope(spec)
+    with self.assertRaisesRegex(ValueError, r'Ambiguous.*PyMap'):
+      scope.get_transform_id(scope.get_transform_id(spec[1]['name']))
+
+  def test_get_transform_id_by_unknown_name(self):
+    spec = self.get_spec()
+    scope = LightweightScope(spec)
+    with self.assertRaisesRegex(ValueError, r'Unknown.*NotExistingTransform'):
+      scope.get_transform_id("NotExistingTransform")
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()
diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py 
b/sdks/python/apache_beam/yaml/yaml_transform_test.py
index eec564b6a10..9a540e3551f 100644
--- a/sdks/python/apache_beam/yaml/yaml_transform_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py
@@ -22,144 +22,13 @@ import os
 import tempfile
 import unittest
 
-import yaml
-
 import apache_beam as beam
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
 from apache_beam.yaml import yaml_provider
-from apache_beam.yaml import yaml_transform
-from apache_beam.yaml.yaml_transform import LightweightScope
-from apache_beam.yaml.yaml_transform import SafeLineLoader
 from apache_beam.yaml.yaml_transform import YamlTransform
 
 
-class YamlTransformTest(unittest.TestCase):
-  def test_only_element(self):
-    self.assertEqual(yaml_transform.only_element((1, )), 1)
-
-
-class SafeLineLoaderTest(unittest.TestCase):
-  def test_get_line(self):
-    pipeline_yaml = '''
-          type: composite
-          input:
-              elements: input
-          transforms:
-            - type: PyMap
-              name: Square
-              input: elements
-              config:
-                  fn: "lambda x: x * x"
-            - type: PyMap
-              name: Cube
-              input: elements
-              config:
-                  fn: "lambda x: x * x * x"
-          output:
-              Flatten
-          '''
-    spec = yaml.load(pipeline_yaml, Loader=SafeLineLoader)
-    self.assertEqual(SafeLineLoader.get_line(spec['type']), 2)
-    self.assertEqual(SafeLineLoader.get_line(spec['input']), 4)
-    self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]), 6)
-    self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]['type']), 6)
-    self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]['name']), 7)
-    self.assertEqual(SafeLineLoader.get_line(spec['transforms'][1]), 11)
-    self.assertEqual(SafeLineLoader.get_line(spec['output']), 17)
-    self.assertEqual(SafeLineLoader.get_line(spec['transforms']), "unknown")
-
-  def test_strip_metadata(self):
-    spec_yaml = '''
-    transforms:
-      - type: PyMap
-        name: Square
-    '''
-    spec = yaml.load(spec_yaml, Loader=SafeLineLoader)
-    stripped = SafeLineLoader.strip_metadata(spec['transforms'])
-
-    self.assertFalse(hasattr(stripped[0], '__line__'))
-    self.assertFalse(hasattr(stripped[0], '__uuid__'))
-
-  def test_strip_metadata_nothing_to_strip(self):
-    spec_yaml = 'prop: 123'
-    spec = yaml.load(spec_yaml, Loader=SafeLineLoader)
-    stripped = SafeLineLoader.strip_metadata(spec['prop'])
-
-    self.assertFalse(hasattr(stripped, '__line__'))
-    self.assertFalse(hasattr(stripped, '__uuid__'))
-
-
-class LightweightScopeTest(unittest.TestCase):
-  @staticmethod
-  def get_spec():
-    pipeline_yaml = '''
-          - type: PyMap
-            name: Square
-            input: elements
-            config:
-                fn: "lambda x: x * x"
-          - type: PyMap
-            name: PyMap
-            input: elements
-            config:
-               fn: "lambda x: x * x * x"
-          - type: Filter
-            name: FilterOutBigNumbers
-            input: PyMap
-            config:
-                keep: "lambda x: x<100"
-          '''
-    return yaml.load(pipeline_yaml, Loader=SafeLineLoader)
-
-  def test_init(self):
-    spec = self.get_spec()
-    scope = LightweightScope(spec)
-    self.assertEqual(len(scope._transforms_by_uuid), 3)
-    self.assertCountEqual(
-        list(scope._uuid_by_name.keys()),
-        ["PyMap", "Square", "Filter", "FilterOutBigNumbers"])
-
-  def test_get_transform_id_and_output_name(self):
-    spec = self.get_spec()
-    scope = LightweightScope(spec)
-    transform_id, output = scope.get_transform_id_and_output_name("Square")
-    self.assertEqual(transform_id, spec[0]['__uuid__'])
-    self.assertEqual(output, None)
-
-  def test_get_transform_id_and_output_name_with_dot(self):
-    spec = self.get_spec()
-    scope = LightweightScope(spec)
-    transform_id, output = \
-      scope.get_transform_id_and_output_name("Square.OutputName")
-    self.assertEqual(transform_id, spec[0]['__uuid__'])
-    self.assertEqual(output, "OutputName")
-
-  def test_get_transform_id_by_uuid(self):
-    spec = self.get_spec()
-    scope = LightweightScope(spec)
-    transform_id = scope.get_transform_id(spec[0]['__uuid__'])
-    self.assertEqual(transform_id, spec[0]['__uuid__'])
-
-  def test_get_transform_id_by_unique_name(self):
-    spec = self.get_spec()
-    scope = LightweightScope(spec)
-    transform_id = scope.get_transform_id("Square")
-    self.assertEqual(transform_id, spec[0]['__uuid__'])
-
-  def test_get_transform_id_by_ambiguous_name(self):
-    spec = self.get_spec()
-    scope = LightweightScope(spec)
-    with self.assertRaisesRegex(ValueError, r'Ambiguous.*PyMap'):
-      scope.get_transform_id(scope.get_transform_id(spec[1]['name']))
-
-  def test_get_transform_id_by_unknown_name(self):
-    spec = self.get_spec()
-    scope = LightweightScope(spec)
-    with self.assertRaisesRegex(ValueError, r'Unknown.*NotExistingTransform'):
-      scope.get_transform_id("NotExistingTransform")
-
-
 class YamlTransformE2ETest(unittest.TestCase):
   def test_composite(self):
     with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
@@ -338,7 +207,7 @@ class YamlTransformE2ETest(unittest.TestCase):
     with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
         pickle_library='cloudpickle')) as p:
       # pylint: disable=expression-not-assigned
-      with self.assertRaises(ValueError):
+      with self.assertRaisesRegex(ValueError, r'Ambiguous.*'):
         p | YamlTransform(
             '''
             type: composite
diff --git a/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py 
b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
new file mode 100644
index 00000000000..d903d85cd21
--- /dev/null
+++ b/sdks/python/apache_beam/yaml/yaml_transform_unit_test.py
@@ -0,0 +1,210 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+import unittest
+
+import yaml
+
+import apache_beam as beam
+from apache_beam.yaml import yaml_provider
+from apache_beam.yaml import yaml_transform
+from apache_beam.yaml.yaml_transform import SafeLineLoader
+from apache_beam.yaml.yaml_transform import Scope
+from apache_beam.yaml.yaml_transform import expand_composite_transform
+from apache_beam.yaml.yaml_transform import pipeline_as_composite
+
+
+class YamlTransformTest(unittest.TestCase):
+  def test_only_element(self):
+    self.assertEqual(yaml_transform.only_element((1, )), 1)
+
+
+class SafeLineLoaderTest(unittest.TestCase):
+  def test_get_line(self):
+    pipeline_yaml = '''
+          type: composite
+          input:
+              elements: input
+          transforms:
+            - type: PyMap
+              name: Square
+              input: elements
+              fn: "lambda x: x * x"
+            - type: PyMap
+              name: Cube
+              input: elements
+              fn: "lambda x: x * x * x"
+          output:
+              Flatten
+          '''
+    spec = yaml.load(pipeline_yaml, Loader=SafeLineLoader)
+    self.assertEqual(SafeLineLoader.get_line(spec['type']), 2)
+    self.assertEqual(SafeLineLoader.get_line(spec['input']), 4)
+    self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]), 6)
+    self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]['type']), 6)
+    self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]['name']), 7)
+    self.assertEqual(SafeLineLoader.get_line(spec['transforms'][1]), 10)
+    self.assertEqual(SafeLineLoader.get_line(spec['output']), 15)
+    self.assertEqual(SafeLineLoader.get_line(spec['transforms']), "unknown")
+
+  def test_strip_metadata(self):
+    spec_yaml = '''
+    transforms:
+      - type: PyMap
+        name: Square
+    '''
+    spec = yaml.load(spec_yaml, Loader=SafeLineLoader)
+    stripped = SafeLineLoader.strip_metadata(spec['transforms'])
+
+    self.assertFalse(hasattr(stripped[0], '__line__'))
+    self.assertFalse(hasattr(stripped[0], '__uuid__'))
+
+  def test_strip_metadata_nothing_to_strip(self):
+    spec_yaml = 'prop: 123'
+    spec = yaml.load(spec_yaml, Loader=SafeLineLoader)
+    stripped = SafeLineLoader.strip_metadata(spec['prop'])
+
+    self.assertFalse(hasattr(stripped, '__line__'))
+    self.assertFalse(hasattr(stripped, '__uuid__'))
+
+
+def new_pipeline():
+  return beam.Pipeline(
+      options=beam.options.pipeline_options.PipelineOptions(
+          pickle_library='cloudpickle'))
+
+
+class MainTest(unittest.TestCase):
+  def get_scope_by_spec(self, p, spec, inputs=None):
+    if inputs is None:
+      inputs = {}
+    spec = yaml.load(spec, Loader=SafeLineLoader)
+
+    scope = Scope(
+        beam.pvalue.PBegin(p),
+        inputs,
+        spec['transforms'],
+        yaml_provider.standard_providers(), {})
+    return scope, spec
+
+  def test_pipeline_as_composite_with_type_transforms(self):
+    spec = '''
+      type: composite
+      transforms:
+      - type: Create
+        config:
+          elements: [0,1,2]
+      - type: PyMap
+        config:
+          fn: 'lambda x: x*x'
+      '''
+    spec = yaml.load(spec, Loader=SafeLineLoader)
+    result = pipeline_as_composite(spec)
+
+    self.assertEqual(result['type'], 'composite')
+    self.assertEqual(result['name'], None)
+
+  def test_pipeline_as_composite_with_transforms(self):
+    spec = '''
+      transforms:
+      - type: Create
+        config:
+          elements: [0,1,2]
+      - type: PyMap
+        config:
+          fn: 'lambda x: x*x'
+      '''
+    spec = yaml.load(spec, Loader=SafeLineLoader)
+    result = pipeline_as_composite(spec)
+
+    self.assertEqual(result['type'], 'composite')
+    self.assertEqual(result['name'], None)
+
+  def test_pipeline_as_composite_list(self):
+    spec = '''
+        - type: Create
+          config:
+            elements: [0,1,2]
+        - type: PyMap
+          config:
+            fn: 'lambda x: x*x'
+      '''
+    spec = yaml.load(spec, Loader=SafeLineLoader)
+    result = pipeline_as_composite(spec)
+
+    self.assertEqual(result['type'], 'composite')
+    self.assertEqual(result['name'], None)
+    self.assertEqual(result['transforms'], spec)
+    self.assertTrue('__line__' in result)
+    self.assertTrue('__uuid__' in result)
+
+  def test_expand_composite_transform_with_name(self):
+    with new_pipeline() as p:
+      spec = '''
+        type: composite
+        name: Custom
+        transforms:
+          - type: Create
+            config:
+              elements: [0,1,2]
+        output: 
+          Create
+        '''
+      scope, spec = self.get_scope_by_spec(p, spec)
+      result = expand_composite_transform(spec, scope)
+      self.assertRegex(
+          str(result['output']), r"PCollection.*Custom/Create/Map.*")
+
+  def test_expand_composite_transform_with_name_input(self):
+    with new_pipeline() as p:
+      spec = '''
+        type: composite
+        input: elements
+        transforms:
+          - type: PyMap
+            input: input
+            config:
+              fn: 'lambda x: x*x'
+        output: 
+          PyMap
+        '''
+      elements = p | beam.Create(range(3))
+      scope, spec = self.get_scope_by_spec(p, spec,
+                                           inputs={'elements': elements})
+      result = expand_composite_transform(spec, scope)
+
+      self.assertRegex(str(result['output']), r"PCollection.*Composite/Map.*")
+
+  def test_expand_composite_transform_root(self):
+    with new_pipeline() as p:
+      spec = '''
+        type: composite
+        transforms:
+          - type: Create
+            config:
+              elements: [0,1,2]
+        output: 
+          Create
+        '''
+      scope, spec = self.get_scope_by_spec(p, spec)
+      result = expand_composite_transform(spec, scope)
+      self.assertRegex(str(result['output']), r"PCollection.*Create/Map.*")
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()

Reply via email to