This is an automated email from the ASF dual-hosted git repository. tvalentyn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new fbeba899071 Revert "Deepcopy combine_fn in PrecombineFn and PostCombineFn." (#32634) fbeba899071 is described below commit fbeba899071acadaa7eba6066a5eb6ed67a2d9f2 Author: claudevdm <33973061+claude...@users.noreply.github.com> AuthorDate: Wed Oct 2 15:57:37 2024 -0400 Revert "Deepcopy combine_fn in PrecombineFn and PostCombineFn." (#32634) This reverts commit eaf53e531b963fe9fc43fb3bd95809cc3c22fe66. --- .../transforms/combinefn_lifecycle_test.py | 11 ++---- sdks/python/apache_beam/transforms/core.py | 43 +++++++++------------- 2 files changed, 22 insertions(+), 32 deletions(-) diff --git a/sdks/python/apache_beam/transforms/combinefn_lifecycle_test.py b/sdks/python/apache_beam/transforms/combinefn_lifecycle_test.py index 2a86f0251e7..62dbbc5fb77 100644 --- a/sdks/python/apache_beam/transforms/combinefn_lifecycle_test.py +++ b/sdks/python/apache_beam/transforms/combinefn_lifecycle_test.py @@ -53,18 +53,15 @@ class CombineFnLifecycleTest(unittest.TestCase): @parameterized_class([ - {'runner': direct_runner.BundleBasedDirectRunner, 'pickler': 'dill'}, - {'runner': direct_runner.BundleBasedDirectRunner, 'pickler': 'cloudpickle'}, - {'runner': fn_api_runner.FnApiRunner, 'pickler': 'dill'}, - {'runner': fn_api_runner.FnApiRunner, 'pickler': 'cloudpickle'}, - ]) # yapf: disable + {'runner': direct_runner.BundleBasedDirectRunner}, + {'runner': fn_api_runner.FnApiRunner}, +]) # yapf: disable class LocalCombineFnLifecycleTest(unittest.TestCase): def tearDown(self): CallSequenceEnforcingCombineFn.instances.clear() def test_combine(self): - test_options = PipelineOptions(flags=[f"--pickle_library={self.pickler}"]) - run_combine(TestPipeline(runner=self.runner(), options=test_options)) + run_combine(TestPipeline(runner=self.runner())) self._assert_teardown_called() def test_non_liftable_combine(self): diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 953b6cfe627..e7180bc093b 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -3158,40 +3158,33 @@ class _CombinePerKeyWithHotKeyFanout(PTransform): yield pvalue.TaggedOutput('hot', ((self._nonce % fanout, key), value)) class PreCombineFn(CombineFn): - def __init__(self): - # Deepcopy of the combine_fn to avoid sharing state between lifted - # stages when using cloudpickle. - self._combine_fn_copy = copy.deepcopy(combine_fn) - self.setup = self._combine_fn_copy.setup - self.create_accumulator = self._combine_fn_copy.create_accumulator - self.add_input = self._combine_fn_copy.add_input - self.merge_accumulators = self._combine_fn_copy.merge_accumulators - self.compact = self._combine_fn_copy.compact - self.teardown = self._combine_fn_copy.teardown - @staticmethod def extract_output(accumulator): # Boolean indicates this is an accumulator. return (True, accumulator) + setup = combine_fn.setup + create_accumulator = combine_fn.create_accumulator + add_input = combine_fn.add_input + merge_accumulators = combine_fn.merge_accumulators + compact = combine_fn.compact + teardown = combine_fn.teardown + class PostCombineFn(CombineFn): - def __init__(self): - # Deepcopy of the combine_fn to avoid sharing state between lifted - # stages when using cloudpickle. - self._combine_fn_copy = copy.deepcopy(combine_fn) - self.setup = self._combine_fn_copy.setup - self.create_accumulator = self._combine_fn_copy.create_accumulator - self.merge_accumulators = self._combine_fn_copy.merge_accumulators - self.compact = self._combine_fn_copy.compact - self.extract_output = self._combine_fn_copy.extract_output - self.teardown = self._combine_fn_copy.teardown - - def add_input(self, accumulator, element): + @staticmethod + def add_input(accumulator, element): is_accumulator, value = element if is_accumulator: - return self._combine_fn_copy.merge_accumulators([accumulator, value]) + return combine_fn.merge_accumulators([accumulator, value]) else: - return self._combine_fn_copy.add_input(accumulator, value) + return combine_fn.add_input(accumulator, value) + + setup = combine_fn.setup + create_accumulator = combine_fn.create_accumulator + merge_accumulators = combine_fn.merge_accumulators + compact = combine_fn.compact + extract_output = combine_fn.extract_output + teardown = combine_fn.teardown def StripNonce(nonce_key_value): (_, key), value = nonce_key_value