This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d799365dbaf [Yaml] Unit Tests for SafeLineLoader and LightweightScope 
(yaml/yaml_transform.py) (#27086)
d799365dbaf is described below

commit d799365dbafeddc5a87722f41960c5e683ae9922
Author: bzablocki <bzablo...@google.com>
AuthorDate: Wed Jul 12 00:40:20 2023 +0200

    [Yaml] Unit Tests for SafeLineLoader and LightweightScope 
(yaml/yaml_transform.py) (#27086)
---
 .../python/apache_beam/yaml/yaml_transform_test.py | 126 +++++++++++++++++++++
 1 file changed, 126 insertions(+)

diff --git a/sdks/python/apache_beam/yaml/yaml_transform_test.py 
b/sdks/python/apache_beam/yaml/yaml_transform_test.py
index 4ccc50da757..ec5b122741a 100644
--- a/sdks/python/apache_beam/yaml/yaml_transform_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_transform_test.py
@@ -22,14 +22,140 @@ import os
 import tempfile
 import unittest
 
+import yaml
+
 import apache_beam as beam
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
 from apache_beam.yaml import yaml_provider
+from apache_beam.yaml import yaml_transform
+from apache_beam.yaml.yaml_transform import LightweightScope
+from apache_beam.yaml.yaml_transform import SafeLineLoader
 from apache_beam.yaml.yaml_transform import YamlTransform
 
 
 class YamlTransformTest(unittest.TestCase):
+  def test_only_element(self):
+    self.assertEqual(yaml_transform.only_element((1, )), 1)
+
+
+class SafeLineLoaderTest(unittest.TestCase):
+  def test_get_line(self):
+    pipeline_yaml = '''
+          type: composite
+          input:
+              elements: input
+          transforms:
+            - type: PyMap
+              name: Square
+              input: elements
+              fn: "lambda x: x * x"
+            - type: PyMap
+              name: Cube
+              input: elements
+              fn: "lambda x: x * x * x"
+          output:
+              Flatten
+          '''
+    spec = yaml.load(pipeline_yaml, Loader=SafeLineLoader)
+    self.assertEqual(SafeLineLoader.get_line(spec['type']), 2)
+    self.assertEqual(SafeLineLoader.get_line(spec['input']), 4)
+    self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]), 6)
+    self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]['type']), 6)
+    self.assertEqual(SafeLineLoader.get_line(spec['transforms'][0]['name']), 7)
+    self.assertEqual(SafeLineLoader.get_line(spec['transforms'][1]), 10)
+    self.assertEqual(SafeLineLoader.get_line(spec['output']), 15)
+    self.assertEqual(SafeLineLoader.get_line(spec['transforms']), "unknown")
+
+  def test_strip_metadata(self):
+    spec_yaml = '''
+    transforms:
+      - type: PyMap
+        name: Square
+    '''
+    spec = yaml.load(spec_yaml, Loader=SafeLineLoader)
+    stripped = SafeLineLoader.strip_metadata(spec['transforms'])
+
+    self.assertFalse(hasattr(stripped[0], '__line__'))
+    self.assertFalse(hasattr(stripped[0], '__uuid__'))
+
+  def test_strip_metadata_nothing_to_strip(self):
+    spec_yaml = 'prop: 123'
+    spec = yaml.load(spec_yaml, Loader=SafeLineLoader)
+    stripped = SafeLineLoader.strip_metadata(spec['prop'])
+
+    self.assertFalse(hasattr(stripped, '__line__'))
+    self.assertFalse(hasattr(stripped, '__uuid__'))
+
+
+class LightweightScopeTest(unittest.TestCase):
+  @staticmethod
+  def get_spec():
+    pipeline_yaml = '''
+          - type: PyMap
+            name: Square
+            input: elements
+            fn: "lambda x: x * x"
+          - type: PyMap
+            name: PyMap
+            input: elements
+            fn: "lambda x: x * x * x"
+          - type: Filter
+            name: FilterOutBigNumbers
+            input: PyMap 
+            keep: "lambda x: x<100"
+          '''
+    return yaml.load(pipeline_yaml, Loader=SafeLineLoader)
+
+  def test_init(self):
+    spec = self.get_spec()
+    scope = LightweightScope(spec)
+    self.assertEqual(len(scope._transforms_by_uuid), 3)
+    self.assertCountEqual(
+        list(scope._uuid_by_name.keys()),
+        ["PyMap", "Square", "Filter", "FilterOutBigNumbers"])
+
+  def test_get_transform_id_and_output_name(self):
+    spec = self.get_spec()
+    scope = LightweightScope(spec)
+    transform_id, output = scope.get_transform_id_and_output_name("Square")
+    self.assertEqual(transform_id, spec[0]['__uuid__'])
+    self.assertEqual(output, None)
+
+  def test_get_transform_id_and_output_name_with_dot(self):
+    spec = self.get_spec()
+    scope = LightweightScope(spec)
+    transform_id, output = \
+      scope.get_transform_id_and_output_name("Square.OutputName")
+    self.assertEqual(transform_id, spec[0]['__uuid__'])
+    self.assertEqual(output, "OutputName")
+
+  def test_get_transform_id_by_uuid(self):
+    spec = self.get_spec()
+    scope = LightweightScope(spec)
+    transform_id = scope.get_transform_id(spec[0]['__uuid__'])
+    self.assertEqual(transform_id, spec[0]['__uuid__'])
+
+  def test_get_transform_id_by_unique_name(self):
+    spec = self.get_spec()
+    scope = LightweightScope(spec)
+    transform_id = scope.get_transform_id("Square")
+    self.assertEqual(transform_id, spec[0]['__uuid__'])
+
+  def test_get_transform_id_by_ambiguous_name(self):
+    spec = self.get_spec()
+    scope = LightweightScope(spec)
+    with self.assertRaisesRegex(ValueError, r'Ambiguous.*PyMap'):
+      scope.get_transform_id(scope.get_transform_id(spec[1]['name']))
+
+  def test_get_transform_id_by_unknown_name(self):
+    spec = self.get_spec()
+    scope = LightweightScope(spec)
+    with self.assertRaisesRegex(ValueError, r'Unknown.*NotExistingTransform'):
+      scope.get_transform_id("NotExistingTransform")
+
+
+class YamlTransformE2ETest(unittest.TestCase):
   def test_composite(self):
     with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
         pickle_library='cloudpickle')) as p:

Reply via email to