This is an automated email from the ASF dual-hosted git repository. anandinguva pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new cf438d3141c Fetch buffer and add it to pcoll_buffer to avoid element duplication when buffer is None (#27676) cf438d3141c is described below commit cf438d3141c655cc5162a2f2df92d1aa651bd9df Author: Anand Inguva <34158215+ananding...@users.noreply.github.com> AuthorDate: Wed Jul 26 22:45:24 2023 -0400 Fetch buffer and add it to pcoll_buffer to avoid element duplication when buffer is None (#27676) * Fetch buffer and add it to pcoll_buffer to avoid duplication * Add buffer_id to buffers_to_clean * Move test --- CHANGES.md | 2 +- .../apache_beam/runners/portability/fn_api_runner/fn_runner.py | 10 +++++++++- .../runners/portability/fn_api_runner/fn_runner_test.py | 9 +++++++++ 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index ec1c112ff4d..231a41774f0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -79,7 +79,7 @@ ## Bugfixes -* Fixed DirectRunner bug in Python SDK where GroupByKey gets empty PCollection and fails when pipeline option `direct_num_workers!=1`. ([#27373](https://github.com/apache/beam/pull/27373)) +* Fixed DirectRunner bug in Python SDK where GroupByKey gets empty PCollection and fails when pipeline option `direct_num_workers!=1`.([#27373](https://github.com/apache/beam/pull/27373)) * Fixed BigQuery I/O bug when estimating size on queries that utilize row-level security ([#27474](https://github.com/apache/beam/pull/27474)) * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py index 8d957068d08..098e16933b7 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py @@ -825,7 +825,8 @@ class FnApiRunner(runner.PipelineRunner): buffers_to_clean = set() known_consumers = set() - for _, buffer_id in bundle_context_manager.stage_data_outputs.items(): + for transform_id, buffer_id in ( + bundle_context_manager.stage_data_outputs.items()): for (consuming_stage_name, consuming_transform) in \ runner_execution_context.buffer_id_to_consumer_pairs.get(buffer_id, []): @@ -841,6 +842,13 @@ class FnApiRunner(runner.PipelineRunner): runner_execution_context.pcoll_buffers[buffer_id] = buffer.copy() buffer = runner_execution_context.pcoll_buffers[buffer_id] + # empty buffer. Add it to the pcoll_buffer to avoid element + # duplication. + if buffer_id not in runner_execution_context.pcoll_buffers: + buffer = bundle_context_manager.get_buffer(buffer_id, transform_id) + runner_execution_context.pcoll_buffers[buffer_id] = buffer + buffers_to_clean.add(buffer_id) + # If the buffer has already been added to be consumed by # (stage, transform), then we don't need to add it again. This case # can happen whenever we flatten the same PCollection with itself. diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index ed09bb8f223..40887c350dd 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -1375,6 +1375,15 @@ class FnApiRunnerTest(unittest.TestCase): def test_pack_combiners(self): self._test_pack_combiners(assert_using_counter_names=True) + def test_group_by_key_with_empty_pcoll_elements(self): + with self.create_pipeline() as p: + res = ( + p + | beam.Create([('test_key', 'test_value')]) + | beam.Filter(lambda x: False) + | beam.GroupByKey()) + assert_that(res, equal_to([])) + # These tests are kept in a separate group so that they are # not ran in the FnApiRunnerTestWithBundleRepeat which repeats