tvalentyn commented on code in PR #35656: URL: https://github.com/apache/beam/pull/35656#discussion_r2276212260
########## sdks/python/apache_beam/internal/code_object_pickler_test.py: ########## @@ -0,0 +1,211 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import hashlib +import unittest + +from parameterized import parameterized + +from apache_beam.internal import code_object_pickler + + +def top_level_function(): + return 1 + + +top_level_lambda = lambda x: 1 + + +def get_nested_function(): + def nested_function(): + return 1 + + return nested_function + + +def get_lambda_from_dictionary(): + d = {"a": lambda x: 1, "b": lambda y: 2} + return d["a"] + + +def get_lambda_from_dictionary_same_args(): + d = {"a": lambda x: 1, "b": lambda x: x + 1} + return d["a"] + + +def function_with_lambda_default_argument(fn=lambda x: 1): + return fn + + +def function_with_function_default_argument(fn=top_level_function): + return fn + + +def function_decorator(f): + return lambda x: f(f(x)) + + +@function_decorator +def add_one(x): + return x + 1 + + +class ClassWithFunction: + def process(self): + return 1 + + +class ClassWithStaticMethod: + @staticmethod + def static_method(): + return 1 + + +class ClassWithClassMethod: + @classmethod + def class_method(cls): + return 1 + + +class ClassWithNestedFunction: + def process(self): + def nested_function(): + return 1 + + return nested_function + + +class ClassWithLambda: + def process(self): + return lambda: 1 + + +class ClassWithNestedClass: + class InnerClass: + def process(self): + return 1 + + +class ClassWithNestedLambda: + def process(self): + def get_lambda_from_dictionary(): + d = {"a": lambda x: 1, "b": lambda y: 2} + return d["a"] + + return get_lambda_from_dictionary() + + +test_cases = [ + ( + top_level_function, + "apache_beam.internal.code_object_pickler_test.top_level_function" + ".__code__"), + ( + top_level_lambda, + "apache_beam.internal.code_object_pickler_test.top_level_lambda" + ".__code__"), + ( + get_nested_function(), + ( + "apache_beam.internal.code_object_pickler_test.get_nested_function" + ".__code__.co_consts[nested_function]")), + ( + get_lambda_from_dictionary(), + ( + "apache_beam.internal.code_object_pickler_test" + ".get_lambda_from_dictionary.__code__.co_consts[<lambda>, ('x',)]") + ), + ( + get_lambda_from_dictionary_same_args(), + ( + "apache_beam.internal.code_object_pickler_test" + ".get_lambda_from_dictionary_same_args.__code__.co_consts" + "[<lambda>, ('x',), " + hashlib.md5( + get_lambda_from_dictionary_same_args().__code__.co_code). + hexdigest() + "]")), + ( + function_with_lambda_default_argument(), + ( + "apache_beam.internal.code_object_pickler_test" + ".function_with_lambda_default_argument.__defaults__[0].__code__")), + ( + function_with_function_default_argument(), + "apache_beam.internal.code_object_pickler_test.top_level_function" + ".__code__"), + ( + add_one, + "apache_beam.internal.code_object_pickler_test.function_decorator" + ".__code__.co_consts[<lambda>]"), + ( + ClassWithFunction.process, + "apache_beam.internal.code_object_pickler_test.ClassWithFunction" + ".process.__code__"), + ( + ClassWithStaticMethod.static_method, + "apache_beam.internal.code_object_pickler_test.ClassWithStaticMethod" + ".static_method.__code__"), + ( + ClassWithClassMethod.class_method, + "apache_beam.internal.code_object_pickler_test.ClassWithClassMethod" + ".class_method.__code__"), + ( + ClassWithNestedFunction().process(), + ( + "apache_beam.internal.code_object_pickler_test" + ".ClassWithNestedFunction.process.__code__.co_consts" + "[nested_function]")), + ( + ClassWithLambda().process(), + "apache_beam.internal.code_object_pickler_test.ClassWithLambda.process" + ".__code__.co_consts[<lambda>]"), + ( + ClassWithNestedClass.InnerClass().process, + "apache_beam.internal.code_object_pickler_test.ClassWithNestedClass" + ".InnerClass.process.__code__"), + ( + ClassWithNestedLambda().process(), + ( + "apache_beam.internal.code_object_pickler_test" + ".ClassWithNestedLambda.process.__code__.co_consts" + "[get_lambda_from_dictionary].co_consts[<lambda>, ('x',)]")), + ( + ClassWithNestedLambda.process, + "apache_beam.internal.code_object_pickler_test.ClassWithNestedLambda" + ".process.__code__"), +] + + +class CodeObjectPicklerTest(unittest.TestCase): + @parameterized.expand(test_cases) + def test_get_code_path(self, callable, expected): Review Comment: ```suggestion def test_get_code_path(self, callable, expected_path): ``` ########## sdks/python/apache_beam/internal/pickle_code_path_test.py: ########## @@ -0,0 +1,421 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Tests for generating stable identifiers to use for Pickle serialization.""" + +import unittest + +# pylint: disable=unused-import +from apache_beam.internal import code_object_pickler +from apache_beam.internal.test_cases import after_module_add_function +from apache_beam.internal.test_cases import after_module_add_lambda_variable +from apache_beam.internal.test_cases import after_module_add_variable +from apache_beam.internal.test_cases import after_module_remove_lambda_variable +from apache_beam.internal.test_cases import after_module_remove_variable +from apache_beam.internal.test_cases import after_module_with_classes +from apache_beam.internal.test_cases import after_module_with_global_variable +from apache_beam.internal.test_cases import after_module_with_nested_function +from apache_beam.internal.test_cases import after_module_with_nested_function_2 +from apache_beam.internal.test_cases import after_module_with_single_class +from apache_beam.internal.test_cases import before_module_with_classes +from apache_beam.internal.test_cases import before_module_with_functions +from apache_beam.internal.test_cases import before_module_with_lambdas +from apache_beam.internal.test_cases import module_with_default_argument + + +class CodePathTest(unittest.TestCase): Review Comment: For all test methods that test `_get_code_from_stable_reference()`, you could create a test class named ``` class GetCodeFromStableReferenceTest(unittest.TestCase): ``` 1) It will be more specific than `CodePathTest`. 2) you don't need to prepend `test_get_code_from_stable_reference...` to each test case in this class. 3) tests that exercise other functionality can be in a separate test class ########## sdks/python/apache_beam/internal/test_cases/__init__.py: ########## @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""For internal use only; no backwards-compatibility guarantees.""" Review Comment: Let's add some description here, like: ```suggestion """Test data to validate that code identifiers are invariant to small modifications.""" ``` ########## sdks/python/apache_beam/internal/pickle_code_path_test.py: ########## @@ -0,0 +1,421 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Tests for generating stable identifiers to use for Pickle serialization.""" + +import unittest + +# pylint: disable=unused-import +from apache_beam.internal import code_object_pickler +from apache_beam.internal.test_cases import after_module_add_function +from apache_beam.internal.test_cases import after_module_add_lambda_variable +from apache_beam.internal.test_cases import after_module_add_variable +from apache_beam.internal.test_cases import after_module_remove_lambda_variable +from apache_beam.internal.test_cases import after_module_remove_variable +from apache_beam.internal.test_cases import after_module_with_classes +from apache_beam.internal.test_cases import after_module_with_global_variable +from apache_beam.internal.test_cases import after_module_with_nested_function +from apache_beam.internal.test_cases import after_module_with_nested_function_2 +from apache_beam.internal.test_cases import after_module_with_single_class +from apache_beam.internal.test_cases import before_module_with_classes +from apache_beam.internal.test_cases import before_module_with_functions +from apache_beam.internal.test_cases import before_module_with_lambdas +from apache_beam.internal.test_cases import module_with_default_argument + + +class CodePathTest(unittest.TestCase): + def test_get_code_from_stable_reference_empty_path_raises_exception(self): + with self.assertRaisesRegex(ValueError, "Path must not be empty"): + code_object_pickler._get_code_from_stable_reference("") + + def test_get_code_from_stable_reference_invalid_default_index_raises_exception(self): # pylint: disable=line-too-long + with self.assertRaisesRegex(ValueError, "out of bounds"): + code_object_pickler._get_code_from_stable_reference( + "apache_beam.internal.test_cases.module_with_default_argument." + "function_with_lambda_default_argument.__defaults__[1]") + + def test_get_code_from_stable_reference_invalid_single_name_path_raises_exception(self): # pylint: disable=line-too-long + with self.assertRaisesRegex(AttributeError, + "Could not find code object with path"): + code_object_pickler._get_code_from_stable_reference( + "apache_beam.internal.test_cases.before_module_with_lambdas." + "my_function.__code__.co_consts[something]") + + def test_get_code_from_stable_reference_invalid_lambda_with_args_path_raises_exception(self): # pylint: disable=line-too-long + with self.assertRaisesRegex(AttributeError, + "Could not find code object with path"): + code_object_pickler._get_code_from_stable_reference( + "apache_beam.internal.test_cases.before_module_with_lambdas." + "my_function.__code__.co_consts[<lambda>, ('x',)]") + + def test_get_code_from_stable_reference_invalid_lambda_with_hash_path_raises_exception(self): # pylint: disable=line-too-long + with self.assertRaisesRegex(AttributeError, + "Could not find code object with path"): + code_object_pickler._get_code_from_stable_reference( + "apache_beam.internal.test_cases.before_module_with_lambdas." + "my_function.__code__.co_consts[<lambda>, ('',), 1234567890]") + + def test_get_code_add_local_variable_in_class_objects_match(self): Review Comment: Test method name suggestions: ``` test_adding_local_variable_in_class_preserves_path test_removing_local_variable_in_class_preserves_path ... ``` What do you think? ########## sdks/python/apache_beam/internal/code_object_pickler_test.py: ########## @@ -0,0 +1,211 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import hashlib +import unittest + +from parameterized import parameterized + +from apache_beam.internal import code_object_pickler + + +def top_level_function(): + return 1 + + +top_level_lambda = lambda x: 1 + + +def get_nested_function(): + def nested_function(): + return 1 + + return nested_function + + +def get_lambda_from_dictionary(): + d = {"a": lambda x: 1, "b": lambda y: 2} + return d["a"] + + +def get_lambda_from_dictionary_same_args(): + d = {"a": lambda x: 1, "b": lambda x: x + 1} + return d["a"] + + +def function_with_lambda_default_argument(fn=lambda x: 1): + return fn + + +def function_with_function_default_argument(fn=top_level_function): + return fn + + +def function_decorator(f): + return lambda x: f(f(x)) + + +@function_decorator +def add_one(x): + return x + 1 + + +class ClassWithFunction: + def process(self): + return 1 + + +class ClassWithStaticMethod: + @staticmethod + def static_method(): + return 1 + + +class ClassWithClassMethod: + @classmethod + def class_method(cls): + return 1 + + +class ClassWithNestedFunction: + def process(self): + def nested_function(): + return 1 + + return nested_function + + +class ClassWithLambda: + def process(self): + return lambda: 1 + + +class ClassWithNestedClass: + class InnerClass: + def process(self): + return 1 + + +class ClassWithNestedLambda: + def process(self): + def get_lambda_from_dictionary(): + d = {"a": lambda x: 1, "b": lambda y: 2} + return d["a"] + + return get_lambda_from_dictionary() + + +test_cases = [ + ( + top_level_function, + "apache_beam.internal.code_object_pickler_test.top_level_function" + ".__code__"), + ( + top_level_lambda, + "apache_beam.internal.code_object_pickler_test.top_level_lambda" + ".__code__"), + ( + get_nested_function(), + ( + "apache_beam.internal.code_object_pickler_test.get_nested_function" + ".__code__.co_consts[nested_function]")), + ( + get_lambda_from_dictionary(), + ( + "apache_beam.internal.code_object_pickler_test" + ".get_lambda_from_dictionary.__code__.co_consts[<lambda>, ('x',)]") + ), + ( + get_lambda_from_dictionary_same_args(), + ( + "apache_beam.internal.code_object_pickler_test" + ".get_lambda_from_dictionary_same_args.__code__.co_consts" + "[<lambda>, ('x',), " + hashlib.md5( + get_lambda_from_dictionary_same_args().__code__.co_code). + hexdigest() + "]")), + ( + function_with_lambda_default_argument(), + ( + "apache_beam.internal.code_object_pickler_test" + ".function_with_lambda_default_argument.__defaults__[0].__code__")), + ( + function_with_function_default_argument(), + "apache_beam.internal.code_object_pickler_test.top_level_function" + ".__code__"), + ( + add_one, + "apache_beam.internal.code_object_pickler_test.function_decorator" + ".__code__.co_consts[<lambda>]"), + ( + ClassWithFunction.process, + "apache_beam.internal.code_object_pickler_test.ClassWithFunction" + ".process.__code__"), + ( + ClassWithStaticMethod.static_method, + "apache_beam.internal.code_object_pickler_test.ClassWithStaticMethod" + ".static_method.__code__"), + ( + ClassWithClassMethod.class_method, + "apache_beam.internal.code_object_pickler_test.ClassWithClassMethod" + ".class_method.__code__"), + ( + ClassWithNestedFunction().process(), + ( + "apache_beam.internal.code_object_pickler_test" + ".ClassWithNestedFunction.process.__code__.co_consts" + "[nested_function]")), + ( + ClassWithLambda().process(), + "apache_beam.internal.code_object_pickler_test.ClassWithLambda.process" + ".__code__.co_consts[<lambda>]"), + ( + ClassWithNestedClass.InnerClass().process, + "apache_beam.internal.code_object_pickler_test.ClassWithNestedClass" + ".InnerClass.process.__code__"), + ( + ClassWithNestedLambda().process(), + ( + "apache_beam.internal.code_object_pickler_test" + ".ClassWithNestedLambda.process.__code__.co_consts" + "[get_lambda_from_dictionary].co_consts[<lambda>, ('x',)]")), + ( + ClassWithNestedLambda.process, + "apache_beam.internal.code_object_pickler_test.ClassWithNestedLambda" + ".process.__code__"), +] + + +class CodeObjectPicklerTest(unittest.TestCase): Review Comment: Consider the following name: ``` class CodePathGenerationTest(unittest.TestCase): ``` This communicates a bit more about the nature of the test. Existing name `CodeObjectPicklerTest` repeats the name of the test file and doesn't add new info, since we already know that this test module would test something from the `code_object_pickler.py`. ########## sdks/python/apache_beam/internal/pickle_code_path_test.py: ########## @@ -0,0 +1,421 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Tests for generating stable identifiers to use for Pickle serialization.""" + +import unittest + +# pylint: disable=unused-import +from apache_beam.internal import code_object_pickler +from apache_beam.internal.test_cases import after_module_add_function +from apache_beam.internal.test_cases import after_module_add_lambda_variable +from apache_beam.internal.test_cases import after_module_add_variable +from apache_beam.internal.test_cases import after_module_remove_lambda_variable +from apache_beam.internal.test_cases import after_module_remove_variable +from apache_beam.internal.test_cases import after_module_with_classes +from apache_beam.internal.test_cases import after_module_with_global_variable +from apache_beam.internal.test_cases import after_module_with_nested_function +from apache_beam.internal.test_cases import after_module_with_nested_function_2 +from apache_beam.internal.test_cases import after_module_with_single_class +from apache_beam.internal.test_cases import before_module_with_classes +from apache_beam.internal.test_cases import before_module_with_functions +from apache_beam.internal.test_cases import before_module_with_lambdas +from apache_beam.internal.test_cases import module_with_default_argument + + +class CodePathTest(unittest.TestCase): + def test_get_code_from_stable_reference_empty_path_raises_exception(self): + with self.assertRaisesRegex(ValueError, "Path must not be empty"): + code_object_pickler._get_code_from_stable_reference("") + + def test_get_code_from_stable_reference_invalid_default_index_raises_exception(self): # pylint: disable=line-too-long + with self.assertRaisesRegex(ValueError, "out of bounds"): + code_object_pickler._get_code_from_stable_reference( + "apache_beam.internal.test_cases.module_with_default_argument." + "function_with_lambda_default_argument.__defaults__[1]") + + def test_get_code_from_stable_reference_invalid_single_name_path_raises_exception(self): # pylint: disable=line-too-long + with self.assertRaisesRegex(AttributeError, + "Could not find code object with path"): + code_object_pickler._get_code_from_stable_reference( + "apache_beam.internal.test_cases.before_module_with_lambdas." + "my_function.__code__.co_consts[something]") + + def test_get_code_from_stable_reference_invalid_lambda_with_args_path_raises_exception(self): # pylint: disable=line-too-long + with self.assertRaisesRegex(AttributeError, + "Could not find code object with path"): + code_object_pickler._get_code_from_stable_reference( + "apache_beam.internal.test_cases.before_module_with_lambdas." + "my_function.__code__.co_consts[<lambda>, ('x',)]") + + def test_get_code_from_stable_reference_invalid_lambda_with_hash_path_raises_exception(self): # pylint: disable=line-too-long + with self.assertRaisesRegex(AttributeError, + "Could not find code object with path"): + code_object_pickler._get_code_from_stable_reference( + "apache_beam.internal.test_cases.before_module_with_lambdas." + "my_function.__code__.co_consts[<lambda>, ('',), 1234567890]") + + def test_get_code_add_local_variable_in_class_objects_match(self): Review Comment: Let's group all tests that exercise the 'stability' or `invariance` of the code path generation into a separate test class. For example: `class CodePathStabilityTest` or `CodePathInvarianceTest` (feel free to come up with a name you like). ########## sdks/python/apache_beam/internal/code_object_pickler.py: ########## @@ -15,7 +15,462 @@ # limitations under the License. # +"""Customizations to how Python code objects are pickled. + +This module provides functions for pickling code objects, especially lambdas, +in a consistent way. It addresses issues with non-deterministic pickling by +creating a unique identifier that is invariant to small changes in the source +code. + +The code object identifiers consists of a sequence of the following parts +separated by periods: +- Module names - The name of the module the code object is in +- Class names - The name of a class containing the code object. There can be + multiple of these in the same identifier in the case of nested + classes. +- Function names - The name of the function containing the code object. + There can be multiple of these in the case of nested functions. +- __code__ - Attribute indicating that we are entering the code object of a + function/method. +- __co_consts__[<name>] - The name of the local variable containing the + code object. In the case of lambdas, the name is created by using the + signature of the lambda and hashing the bytecode, as shown below. + +Examples: +- __main__.top_level_function.__code__ +- __main__.ClassWithNestedFunction.process.__code__.co_consts[nested_function] +- __main__.ClassWithNestedLambda.process.__code__.co_consts[ + get_lambda_from_dictionary].co_consts[<lambda>, ('x',)] +- __main__.ClassWithNestedLambda.process.__code__.co_consts[ + <lambda>, ('x',), 1234567890] +""" + +import collections +import hashlib +import inspect +import re +import sys +import types +from typing import Union + def get_normalized_path(path): """Returns a normalized path. This function is intended to be overridden.""" return path + + +def get_code_path(callable: types.FunctionType): + """Returns the stable reference to the code object. + + Will be implemented using cloudpickle in a future version. + + Args: + callable: The callable object to search for. + + Returns: + The stable reference to the code object. + Examples: + - __main__.top_level_function.__code__ + - __main__.ClassWithNestedFunction.process.__code__.co_consts[ + nested_function] + - __main__.ClassWithNestedLambda.process.__code__.co_consts[ + get_lambda_from_dictionary].co_consts[<lambda>, ('x',)] + - __main__.ClassWithNestedLambda.process.__code__.co_consts[ + <lambda>, ('x',), 1234567890] + """ + if not hasattr(callable, '__module__') or not hasattr(callable, + '__qualname__'): + return None + code_path = _extend_path( + callable.__module__, + _search( + callable, + sys.modules[callable.__module__], + callable.__qualname__.split('.'), + ), + ) + return code_path + + +def _extend_path(prefix: str, suffix: str): + """Extends the path to the code object. + + Args: + prefix: The prefix of the path. + suffix: The rest of the path. + + Returns: + The extended path. + """ + if suffix is None: + return None + if not suffix: + return prefix + return prefix + '.' + suffix + + +def _search( + callable: types.FunctionType, + node: Union[types.ModuleType, types.FunctionType, types.CodeType], + qual_name_parts: list[str]): + """Searches an object to create a stable reference code path. + + Recursively searches the tree of objects starting from node to find the + callable's code object. It uses qual_name_parts to navigate through + attributes. Special components like '<locals>' and '<lambda>' direct the + search within nested code objects. + + + Example of qual_name_parts: ['MyClass', 'process', '<locals>', '<lambda>'] + + Args: + callable: The callable object to search for. + node: The object to search within. + qual_name_parts: A list of strings representing the qualified name of the + callable object. + + Returns: + The stable reference to the code object, or None if not found. + """ + if node is None: + return None + if not qual_name_parts: + if (hasattr(node, '__code__') and hasattr(callable, '__code__') and + node.__code__ == callable.__code__): + return '__code__' + else: + return None + if inspect.ismodule(node) or inspect.isclass(node): + return _search_module_or_class(callable, node, qual_name_parts) + elif inspect.isfunction(node): + return _search_function(callable, node, qual_name_parts) + elif inspect.iscode(node): + return _search_code(callable, node, qual_name_parts) + + +def _search_module_or_class( + callable: types.FunctionType, + node: types.ModuleType, + qual_name_parts: list[str]): + """Searches a module or class to create a stable reference code path. + + Args: + callable: The callable object to search for. + node: The module or class to search within. + qual_name_parts: The list of qual name parts. + + Returns: + The stable reference to the code object, or None if not found. + """ + # Functions/methods have a name that is unique within a given module or class + # so the traversal can directly lookup function object identified by the name. + # Lambdas don't have a name so we need to search all the attributes of the + # node. + first_part = qual_name_parts[0] + rest = qual_name_parts[1:] + if first_part == '<lambda>': + for name in dir(node): + value = getattr(node, name) + if (hasattr(callable, '__code__') and + isinstance(value, type(callable)) and + value.__code__ == callable.__code__): + return name + '.__code__' + elif (isinstance(value, types.FunctionType) and + value.__defaults__ is not None): + # Python functions can have other functions as default parameters which + # might contain the code object so we have to search them. + for i, default_param_value in enumerate(value.__defaults__): + path = _search(callable, default_param_value, rest) + if path is not None: + return _extend_path(name, _extend_path(f'__defaults__[{i}]', path)) + else: + return _extend_path( + first_part, _search(callable, getattr(node, first_part), rest)) + + +def _search_function( + callable: types.FunctionType, + node: types.FunctionType, + qual_name_parts: list[str]): + """Searches a function to create a stable reference code path. + + Args: + callable: The callable object to search for. + node: The function to search within. + qual_name_parts: The list of qual name parts. + + Returns: + The stable reference to the code object, or None if not found. + """ + first_part = qual_name_parts[0] + if (node.__code__ == callable.__code__): + if len(qual_name_parts) > 1: + raise ValueError('Qual name parts too long') + return '__code__' + # If first part is '<locals>' then the code object is in a local variable + # so we should add __code__ to the path to indicate that we are entering + # the code object of the function. + if first_part == '<locals>': + return _extend_path( + '__code__', _search(callable, node.__code__, qual_name_parts)) + + +def _search_code( + callable: types.FunctionType, + node: types.CodeType, + qual_name_parts: list[str]): + """Searches a code object to create a stable reference code path. + + Args: + callable: The callable to search for. + node: The code object to search within. + qual_name_parts: The list of qual name parts. + + Returns: + The stable reference to the code object, or None if not found. + + Raises: + ValueError: If the qual name parts are too long. + """ + first_part = qual_name_parts[0] + rest = qual_name_parts[1:] + if hasattr(callable, '__code__') and node == callable.__code__: + if len(qual_name_parts) > 1: + raise ValueError('Qual name parts too long') + return '' + elif first_part == '<locals>': + code_objects_by_name = collections.defaultdict(list) + for co_const in node.co_consts: + if inspect.iscode(co_const): + code_objects_by_name[co_const.co_name].append(co_const) + num_lambdas = len(code_objects_by_name.get('<lambda>', [])) + # If there is only one lambda, we can use the default path + # 'co_consts[<lambda>]'. This is the most common case and it is + # faster than calculating the signature and the hash. + if num_lambdas == 1: + path = _search(callable, code_objects_by_name['<lambda>'][0], rest) + if path is not None: + return _extend_path('co_consts[<lambda>]', path) + else: + return _search_lambda(callable, code_objects_by_name, rest) + elif node.co_name == first_part: + return _search(callable, node, rest) + + +def _search_lambda( + callable: types.FunctionType, + code_objects_by_name: dict[str, list[types.CodeType]], + qual_name_parts: list[str]): + """Searches a lambda to create a stable reference code path. + + Args: + callable: The callable to search for. + code_objects_by_name: The code objects to search within, keyed by name. + qual_name_parts: The rest of the qual_name_parts. + + Returns: + The stable reference to the code object, or None if not found. + """ + # There are multiple lambdas in the code object, so we need to calculate + # the signature and the hash to identify the correct lambda. + lambda_code_objects_by_name = collections.defaultdict(list) + name = qual_name_parts[0] + code_objects = code_objects_by_name[name] + if name == '<lambda>': + for code_object in code_objects: + lambda_name = f'<lambda>, {_signature(code_object)}' + lambda_code_objects_by_name[lambda_name].append(code_object) + # Check if there are any lambdas with the same signature. + # If there are, we need to calculate the hash to identify the correct + # lambda. + for lambda_name, lambda_objects in lambda_code_objects_by_name.items(): + if len(lambda_objects) > 1: + for lambda_object in lambda_objects: + path = _search(callable, lambda_object, qual_name_parts) + if path is not None: + return _extend_path( + f'co_consts[{lambda_name},' + f' {_create_bytecode_hash(lambda_object)}]', + path, + ) + else: + # If there is only one lambda with this signature, we can + # use the signature to identify the correct lambda. + path = _search(callable, code_objects[0], qual_name_parts) + if path is not None: + return _extend_path(f'co_consts[{lambda_name}]', path) + else: + # For non lambda objects, we can use the name to identify the object. + path = _search(callable, code_objects[0], qual_name_parts) + if path is not None: + return _extend_path(f'co_consts[{name}]', path) + + +# Matches a path like: co_consts[my_function] +_SINGLE_NAME_PATTERN = re.compile(r'co_consts\[([a-zA-Z0-9\<\>_-]+)]') +# Matches a path like: co_consts[<lambda>, ('x',)] +_LAMBDA_WITH_ARGS_PATTERN = re.compile( + r"co_consts\[(<[^>]+>),\s*(\('[^']*'\s*,\s*\))\]") +# Matches a path like: co_consts[<lambda>, ('x',), 1234567890] +_LAMBDA_WITH_HASH_PATTERN = re.compile( + r"co_consts\[(<[^>]+>),\s*(\('[^']*'\s*,\s*\)),\s*(.+)\]") +# Matches a path like: __defaults__[0] +_DEFAULT_PATTERN = re.compile(r'(__defaults__)\[(\d+)\]') +# Matches an argument like: 'x' +_ARGUMENT_PATTERN = re.compile(r"'([^']*)'") + + +def _get_code_object_from_single_name_pattern( + obj: types.ModuleType, name_result: re.Match[str], path: str): + """Returns the code object from a name pattern. + + Args: + obj: The object to search within. + name_result: The result of the name pattern search. + path: The path to the code object. + + Returns: + The code object. + + Raises: + ValueError: If the pattern is invalid. + AttributeError: If the code object is not found. + """ + if len(name_result.groups()) > 1: + raise ValueError(f'Invalid pattern for single name: {name_result.group(0)}') + # Groups are indexed starting at 1, group(0) is the entire match. + name = name_result.group(1) + for co_const in obj.co_consts: + if inspect.iscode(co_const) and co_const.co_name == name: + return co_const + raise AttributeError(f'Could not find code object with path: {path}') + + +def _get_code_object_from_lambda_with_args_pattern( + obj: types.ModuleType, lambda_with_args_result: re.Match[str], path: str): + """Returns the code object from a lambda with args pattern. + + Args: + obj: The object to search within. + lambda_with_args_result: The result of the lambda with args pattern search. + path: The path to the code object. + + Returns: + The code object. + + Raises: + AttributeError: If the code object is not found. + """ + name = lambda_with_args_result.group(1) + code_objects = collections.defaultdict(list) + for co_const in obj.co_consts: + if inspect.iscode(co_const) and co_const.co_name == name: + code_objects[co_const.co_name].append(co_const) + for name, objects in code_objects.items(): + for obj_ in objects: + args = tuple( + re.findall(_ARGUMENT_PATTERN, lambda_with_args_result.group(2))) + if obj_.co_varnames == args: + return obj_ + raise AttributeError(f'Could not find code object with path: {path}') + + +def _get_code_object_from_lambda_with_hash_pattern( + obj: types.ModuleType, lambda_with_hash_result: re.Match[str], path: str): + """Returns the code object from a lambda with hash pattern. + + Args: + obj: The object to search within. + lambda_with_hash_result: The result of the lambda with hash pattern search. + path: The path to the code object. + + Returns: + The code object. + + Raises: + AttributeError: If the code object is not found. + """ + name = lambda_with_hash_result.group(1) + code_objects = collections.defaultdict(list) + for co_const in obj.co_consts: + if inspect.iscode(co_const) and co_const.co_name == name: + code_objects[co_const.co_name].append(co_const) + for name, objects in code_objects.items(): + for obj_ in objects: + args = tuple( + re.findall(_ARGUMENT_PATTERN, lambda_with_hash_result.group(2))) + if obj_.co_varnames == args: + hash_value = lambda_with_hash_result.group(3) + if hash_value == str(_create_bytecode_hash(obj_)): + return obj_ + raise AttributeError(f'Could not find code object with path: {path}') + + +def _get_code_from_stable_reference(path: str): Review Comment: Looks like this function is also a top level helper that will be called from other modules, so it shouldn't have the leading underscore. ```suggestion def get_code_from_stable_reference(path: str): ``` ########## sdks/python/apache_beam/internal/code_object_pickler.py: ########## @@ -15,7 +15,462 @@ # limitations under the License. # +"""Customizations to how Python code objects are pickled. + +This module provides functions for pickling code objects, especially lambdas, +in a consistent way. It addresses issues with non-deterministic pickling by +creating a unique identifier that is invariant to small changes in the source +code. + +The code object identifiers consists of a sequence of the following parts +separated by periods: +- Module names - The name of the module the code object is in +- Class names - The name of a class containing the code object. There can be + multiple of these in the same identifier in the case of nested + classes. +- Function names - The name of the function containing the code object. + There can be multiple of these in the case of nested functions. +- __code__ - Attribute indicating that we are entering the code object of a + function/method. +- __co_consts__[<name>] - The name of the local variable containing the + code object. In the case of lambdas, the name is created by using the + signature of the lambda and hashing the bytecode, as shown below. + +Examples: +- __main__.top_level_function.__code__ +- __main__.ClassWithNestedFunction.process.__code__.co_consts[nested_function] +- __main__.ClassWithNestedLambda.process.__code__.co_consts[ + get_lambda_from_dictionary].co_consts[<lambda>, ('x',)] +- __main__.ClassWithNestedLambda.process.__code__.co_consts[ + <lambda>, ('x',), 1234567890] +""" + +import collections +import hashlib +import inspect +import re +import sys +import types +from typing import Union + def get_normalized_path(path): """Returns a normalized path. This function is intended to be overridden.""" return path + + +def get_code_path(callable: types.FunctionType): + """Returns the stable reference to the code object. + + Will be implemented using cloudpickle in a future version. Review Comment: > Will be implemented using cloudpickle in a future version. Do you mean implemented using cloudpickle or integrated with cloudpickle? How about we do one of the following (pick the option that makes most sense) a) Remove this comment line, instead modify the module level docstring to something like: `This module provides helper functions to improve pickling code objects, especially lambdas, in a more consistent way by using code object identifiers. These helper functions will be used to patch pickler implementations used by Beam (e.g. Cloudpickle) b) Change this line into a TODO and file a Github issues that describes the TODO, then add something like: `# TODO (https://github.com/apache/beam/issues/23): Integrate get_code_path with cloudpickle ` ########## sdks/python/apache_beam/internal/test_cases/__init__.py: ########## @@ -0,0 +1,18 @@ +# Review Comment: Lets's rename test_cases to test_data ########## sdks/python/apache_beam/internal/pickle_code_path_test.py: ########## @@ -0,0 +1,421 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Tests for generating stable identifiers to use for Pickle serialization.""" Review Comment: I think we could combine both test files into. Consider moving this content into code_object_pickler_test and moving this docstring there. ########## sdks/python/apache_beam/internal/test_cases/after_module_add_function.py: ########## @@ -0,0 +1,33 @@ +# Review Comment: instead of `before...` `after...`, how about we name the modules like so: `module_<X> `, `module_<X>_modified` for example: `module_with_nested_functions` `module_with_nested_functions_modified` Or, perhaps like so, if you would like to describe the modifications: `module_1` `module_1_function_added` ########## sdks/python/apache_beam/internal/code_object_pickler.py: ########## @@ -15,7 +15,462 @@ # limitations under the License. # +"""Customizations to how Python code objects are pickled. + +This module provides functions for pickling code objects, especially lambdas, +in a consistent way. It addresses issues with non-deterministic pickling by +creating a unique identifier that is invariant to small changes in the source +code. + +The code object identifiers consists of a sequence of the following parts +separated by periods: +- Module names - The name of the module the code object is in +- Class names - The name of a class containing the code object. There can be + multiple of these in the same identifier in the case of nested + classes. +- Function names - The name of the function containing the code object. + There can be multiple of these in the case of nested functions. +- __code__ - Attribute indicating that we are entering the code object of a + function/method. +- __co_consts__[<name>] - The name of the local variable containing the + code object. In the case of lambdas, the name is created by using the + signature of the lambda and hashing the bytecode, as shown below. + +Examples: +- __main__.top_level_function.__code__ +- __main__.ClassWithNestedFunction.process.__code__.co_consts[nested_function] +- __main__.ClassWithNestedLambda.process.__code__.co_consts[ + get_lambda_from_dictionary].co_consts[<lambda>, ('x',)] +- __main__.ClassWithNestedLambda.process.__code__.co_consts[ + <lambda>, ('x',), 1234567890] +""" + +import collections +import hashlib +import inspect +import re +import sys +import types +from typing import Union + def get_normalized_path(path): """Returns a normalized path. This function is intended to be overridden.""" return path + + +def get_code_path(callable: types.FunctionType): + """Returns the stable reference to the code object. + + Will be implemented using cloudpickle in a future version. + + Args: + callable: The callable object to search for. + + Returns: + The stable reference to the code object. + Examples: + - __main__.top_level_function.__code__ + - __main__.ClassWithNestedFunction.process.__code__.co_consts[ + nested_function] + - __main__.ClassWithNestedLambda.process.__code__.co_consts[ + get_lambda_from_dictionary].co_consts[<lambda>, ('x',)] + - __main__.ClassWithNestedLambda.process.__code__.co_consts[ + <lambda>, ('x',), 1234567890] + """ + if not hasattr(callable, '__module__') or not hasattr(callable, + '__qualname__'): + return None + code_path = _extend_path( + callable.__module__, + _search( + callable, + sys.modules[callable.__module__], + callable.__qualname__.split('.'), + ), + ) + return code_path + + +def _extend_path(prefix: str, suffix: str): + """Extends the path to the code object. + + Args: + prefix: The prefix of the path. + suffix: The rest of the path. + + Returns: + The extended path. + """ + if suffix is None: + return None + if not suffix: + return prefix + return prefix + '.' + suffix + + +def _search( + callable: types.FunctionType, + node: Union[types.ModuleType, types.FunctionType, types.CodeType], + qual_name_parts: list[str]): + """Searches an object to create a stable reference code path. + + Recursively searches the tree of objects starting from node to find the + callable's code object. It uses qual_name_parts to navigate through + attributes. Special components like '<locals>' and '<lambda>' direct the + search within nested code objects. + + + Example of qual_name_parts: ['MyClass', 'process', '<locals>', '<lambda>'] + + Args: + callable: The callable object to search for. + node: The object to search within. + qual_name_parts: A list of strings representing the qualified name of the + callable object. + + Returns: + The stable reference to the code object, or None if not found. + """ + if node is None: + return None + if not qual_name_parts: + if (hasattr(node, '__code__') and hasattr(callable, '__code__') and + node.__code__ == callable.__code__): + return '__code__' + else: + return None + if inspect.ismodule(node) or inspect.isclass(node): + return _search_module_or_class(callable, node, qual_name_parts) + elif inspect.isfunction(node): + return _search_function(callable, node, qual_name_parts) + elif inspect.iscode(node): + return _search_code(callable, node, qual_name_parts) + + +def _search_module_or_class( + callable: types.FunctionType, + node: types.ModuleType, + qual_name_parts: list[str]): + """Searches a module or class to create a stable reference code path. + + Args: + callable: The callable object to search for. + node: The module or class to search within. + qual_name_parts: The list of qual name parts. + + Returns: + The stable reference to the code object, or None if not found. + """ + # Functions/methods have a name that is unique within a given module or class + # so the traversal can directly lookup function object identified by the name. + # Lambdas don't have a name so we need to search all the attributes of the + # node. + first_part = qual_name_parts[0] + rest = qual_name_parts[1:] + if first_part == '<lambda>': + for name in dir(node): + value = getattr(node, name) + if (hasattr(callable, '__code__') and + isinstance(value, type(callable)) and + value.__code__ == callable.__code__): + return name + '.__code__' + elif (isinstance(value, types.FunctionType) and + value.__defaults__ is not None): + # Python functions can have other functions as default parameters which + # might contain the code object so we have to search them. + for i, default_param_value in enumerate(value.__defaults__): + path = _search(callable, default_param_value, rest) + if path is not None: + return _extend_path(name, _extend_path(f'__defaults__[{i}]', path)) + else: + return _extend_path( + first_part, _search(callable, getattr(node, first_part), rest)) + + +def _search_function( + callable: types.FunctionType, + node: types.FunctionType, + qual_name_parts: list[str]): + """Searches a function to create a stable reference code path. + + Args: + callable: The callable object to search for. + node: The function to search within. + qual_name_parts: The list of qual name parts. + + Returns: + The stable reference to the code object, or None if not found. + """ + first_part = qual_name_parts[0] + if (node.__code__ == callable.__code__): + if len(qual_name_parts) > 1: + raise ValueError('Qual name parts too long') + return '__code__' + # If first part is '<locals>' then the code object is in a local variable + # so we should add __code__ to the path to indicate that we are entering + # the code object of the function. + if first_part == '<locals>': + return _extend_path( + '__code__', _search(callable, node.__code__, qual_name_parts)) + + +def _search_code( + callable: types.FunctionType, + node: types.CodeType, + qual_name_parts: list[str]): + """Searches a code object to create a stable reference code path. + + Args: + callable: The callable to search for. + node: The code object to search within. + qual_name_parts: The list of qual name parts. + + Returns: + The stable reference to the code object, or None if not found. + + Raises: + ValueError: If the qual name parts are too long. + """ + first_part = qual_name_parts[0] + rest = qual_name_parts[1:] + if hasattr(callable, '__code__') and node == callable.__code__: + if len(qual_name_parts) > 1: + raise ValueError('Qual name parts too long') + return '' + elif first_part == '<locals>': + code_objects_by_name = collections.defaultdict(list) + for co_const in node.co_consts: + if inspect.iscode(co_const): + code_objects_by_name[co_const.co_name].append(co_const) + num_lambdas = len(code_objects_by_name.get('<lambda>', [])) + # If there is only one lambda, we can use the default path + # 'co_consts[<lambda>]'. This is the most common case and it is + # faster than calculating the signature and the hash. + if num_lambdas == 1: + path = _search(callable, code_objects_by_name['<lambda>'][0], rest) + if path is not None: + return _extend_path('co_consts[<lambda>]', path) + else: + return _search_lambda(callable, code_objects_by_name, rest) + elif node.co_name == first_part: + return _search(callable, node, rest) + + +def _search_lambda( + callable: types.FunctionType, + code_objects_by_name: dict[str, list[types.CodeType]], + qual_name_parts: list[str]): + """Searches a lambda to create a stable reference code path. + + Args: + callable: The callable to search for. + code_objects_by_name: The code objects to search within, keyed by name. + qual_name_parts: The rest of the qual_name_parts. + + Returns: + The stable reference to the code object, or None if not found. + """ + # There are multiple lambdas in the code object, so we need to calculate + # the signature and the hash to identify the correct lambda. + lambda_code_objects_by_name = collections.defaultdict(list) + name = qual_name_parts[0] + code_objects = code_objects_by_name[name] + if name == '<lambda>': + for code_object in code_objects: + lambda_name = f'<lambda>, {_signature(code_object)}' + lambda_code_objects_by_name[lambda_name].append(code_object) + # Check if there are any lambdas with the same signature. + # If there are, we need to calculate the hash to identify the correct + # lambda. + for lambda_name, lambda_objects in lambda_code_objects_by_name.items(): + if len(lambda_objects) > 1: + for lambda_object in lambda_objects: + path = _search(callable, lambda_object, qual_name_parts) + if path is not None: + return _extend_path( + f'co_consts[{lambda_name},' + f' {_create_bytecode_hash(lambda_object)}]', + path, + ) + else: + # If there is only one lambda with this signature, we can + # use the signature to identify the correct lambda. + path = _search(callable, code_objects[0], qual_name_parts) + if path is not None: + return _extend_path(f'co_consts[{lambda_name}]', path) + else: + # For non lambda objects, we can use the name to identify the object. + path = _search(callable, code_objects[0], qual_name_parts) + if path is not None: + return _extend_path(f'co_consts[{name}]', path) + + +# Matches a path like: co_consts[my_function] +_SINGLE_NAME_PATTERN = re.compile(r'co_consts\[([a-zA-Z0-9\<\>_-]+)]') +# Matches a path like: co_consts[<lambda>, ('x',)] +_LAMBDA_WITH_ARGS_PATTERN = re.compile( + r"co_consts\[(<[^>]+>),\s*(\('[^']*'\s*,\s*\))\]") +# Matches a path like: co_consts[<lambda>, ('x',), 1234567890] +_LAMBDA_WITH_HASH_PATTERN = re.compile( + r"co_consts\[(<[^>]+>),\s*(\('[^']*'\s*,\s*\)),\s*(.+)\]") +# Matches a path like: __defaults__[0] +_DEFAULT_PATTERN = re.compile(r'(__defaults__)\[(\d+)\]') +# Matches an argument like: 'x' +_ARGUMENT_PATTERN = re.compile(r"'([^']*)'") + + +def _get_code_object_from_single_name_pattern( + obj: types.ModuleType, name_result: re.Match[str], path: str): + """Returns the code object from a name pattern. + + Args: + obj: The object to search within. + name_result: The result of the name pattern search. + path: The path to the code object. + + Returns: + The code object. + + Raises: + ValueError: If the pattern is invalid. + AttributeError: If the code object is not found. + """ + if len(name_result.groups()) > 1: + raise ValueError(f'Invalid pattern for single name: {name_result.group(0)}') + # Groups are indexed starting at 1, group(0) is the entire match. + name = name_result.group(1) + for co_const in obj.co_consts: + if inspect.iscode(co_const) and co_const.co_name == name: + return co_const + raise AttributeError(f'Could not find code object with path: {path}') + + +def _get_code_object_from_lambda_with_args_pattern( + obj: types.ModuleType, lambda_with_args_result: re.Match[str], path: str): + """Returns the code object from a lambda with args pattern. + + Args: + obj: The object to search within. + lambda_with_args_result: The result of the lambda with args pattern search. + path: The path to the code object. + + Returns: + The code object. + + Raises: + AttributeError: If the code object is not found. + """ + name = lambda_with_args_result.group(1) + code_objects = collections.defaultdict(list) + for co_const in obj.co_consts: + if inspect.iscode(co_const) and co_const.co_name == name: + code_objects[co_const.co_name].append(co_const) + for name, objects in code_objects.items(): + for obj_ in objects: + args = tuple( + re.findall(_ARGUMENT_PATTERN, lambda_with_args_result.group(2))) + if obj_.co_varnames == args: + return obj_ + raise AttributeError(f'Could not find code object with path: {path}') + + +def _get_code_object_from_lambda_with_hash_pattern( + obj: types.ModuleType, lambda_with_hash_result: re.Match[str], path: str): + """Returns the code object from a lambda with hash pattern. + + Args: + obj: The object to search within. + lambda_with_hash_result: The result of the lambda with hash pattern search. + path: The path to the code object. + + Returns: + The code object. + + Raises: + AttributeError: If the code object is not found. + """ + name = lambda_with_hash_result.group(1) + code_objects = collections.defaultdict(list) + for co_const in obj.co_consts: + if inspect.iscode(co_const) and co_const.co_name == name: + code_objects[co_const.co_name].append(co_const) + for name, objects in code_objects.items(): + for obj_ in objects: + args = tuple( + re.findall(_ARGUMENT_PATTERN, lambda_with_hash_result.group(2))) + if obj_.co_varnames == args: + hash_value = lambda_with_hash_result.group(3) + if hash_value == str(_create_bytecode_hash(obj_)): + return obj_ + raise AttributeError(f'Could not find code object with path: {path}') + + +def _get_code_from_stable_reference(path: str): Review Comment: Also please add a return type hint. ########## sdks/python/apache_beam/internal/code_object_pickler_test.py: ########## @@ -0,0 +1,211 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import hashlib +import unittest + +from parameterized import parameterized + +from apache_beam.internal import code_object_pickler + + +def top_level_function(): + return 1 + + +top_level_lambda = lambda x: 1 + + +def get_nested_function(): + def nested_function(): + return 1 + + return nested_function + + +def get_lambda_from_dictionary(): + d = {"a": lambda x: 1, "b": lambda y: 2} + return d["a"] + + +def get_lambda_from_dictionary_same_args(): + d = {"a": lambda x: 1, "b": lambda x: x + 1} + return d["a"] + + +def function_with_lambda_default_argument(fn=lambda x: 1): + return fn + + +def function_with_function_default_argument(fn=top_level_function): + return fn + + +def function_decorator(f): + return lambda x: f(f(x)) + + +@function_decorator +def add_one(x): + return x + 1 + + +class ClassWithFunction: + def process(self): + return 1 + + +class ClassWithStaticMethod: + @staticmethod + def static_method(): + return 1 + + +class ClassWithClassMethod: + @classmethod + def class_method(cls): + return 1 + + +class ClassWithNestedFunction: + def process(self): + def nested_function(): + return 1 + + return nested_function + + +class ClassWithLambda: + def process(self): + return lambda: 1 + + +class ClassWithNestedClass: + class InnerClass: + def process(self): + return 1 + + +class ClassWithNestedLambda: + def process(self): + def get_lambda_from_dictionary(): + d = {"a": lambda x: 1, "b": lambda y: 2} + return d["a"] + + return get_lambda_from_dictionary() + + +test_cases = [ + ( + top_level_function, + "apache_beam.internal.code_object_pickler_test.top_level_function" + ".__code__"), + ( + top_level_lambda, + "apache_beam.internal.code_object_pickler_test.top_level_lambda" + ".__code__"), + ( + get_nested_function(), + ( + "apache_beam.internal.code_object_pickler_test.get_nested_function" + ".__code__.co_consts[nested_function]")), + ( + get_lambda_from_dictionary(), + ( + "apache_beam.internal.code_object_pickler_test" + ".get_lambda_from_dictionary.__code__.co_consts[<lambda>, ('x',)]") + ), + ( + get_lambda_from_dictionary_same_args(), + ( + "apache_beam.internal.code_object_pickler_test" + ".get_lambda_from_dictionary_same_args.__code__.co_consts" + "[<lambda>, ('x',), " + hashlib.md5( + get_lambda_from_dictionary_same_args().__code__.co_code). + hexdigest() + "]")), + ( + function_with_lambda_default_argument(), + ( + "apache_beam.internal.code_object_pickler_test" + ".function_with_lambda_default_argument.__defaults__[0].__code__")), + ( + function_with_function_default_argument(), + "apache_beam.internal.code_object_pickler_test.top_level_function" + ".__code__"), + ( + add_one, + "apache_beam.internal.code_object_pickler_test.function_decorator" + ".__code__.co_consts[<lambda>]"), + ( + ClassWithFunction.process, + "apache_beam.internal.code_object_pickler_test.ClassWithFunction" + ".process.__code__"), + ( + ClassWithStaticMethod.static_method, + "apache_beam.internal.code_object_pickler_test.ClassWithStaticMethod" + ".static_method.__code__"), + ( + ClassWithClassMethod.class_method, + "apache_beam.internal.code_object_pickler_test.ClassWithClassMethod" + ".class_method.__code__"), + ( + ClassWithNestedFunction().process(), + ( + "apache_beam.internal.code_object_pickler_test" + ".ClassWithNestedFunction.process.__code__.co_consts" + "[nested_function]")), + ( + ClassWithLambda().process(), + "apache_beam.internal.code_object_pickler_test.ClassWithLambda.process" + ".__code__.co_consts[<lambda>]"), + ( + ClassWithNestedClass.InnerClass().process, + "apache_beam.internal.code_object_pickler_test.ClassWithNestedClass" + ".InnerClass.process.__code__"), + ( + ClassWithNestedLambda().process(), + ( + "apache_beam.internal.code_object_pickler_test" + ".ClassWithNestedLambda.process.__code__.co_consts" + "[get_lambda_from_dictionary].co_consts[<lambda>, ('x',)]")), + ( + ClassWithNestedLambda.process, + "apache_beam.internal.code_object_pickler_test.ClassWithNestedLambda" + ".process.__code__"), +] + + +class CodeObjectPicklerTest(unittest.TestCase): + @parameterized.expand(test_cases) + def test_get_code_path(self, callable, expected): Review Comment: re second argument (`expected`): let's use try to be consistent in function signature and use same connotation in argument names in all test cases since in all test methods the last argument refers to the third element of the test case tuple, which is path, so you could name it 'path' in all scenarios. For example: ``` def test_get_code_path(self, callable, expected_path): def test_get_code_path(self, expected_callable, path): def test_roundtrip(self, callable, unused_path): ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
