This is an automated email from the ASF dual-hosted git repository.

anandinguva pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new cf438d3141c Fetch buffer and add it to pcoll_buffer to avoid element 
duplication when buffer is None (#27676)
cf438d3141c is described below

commit cf438d3141c655cc5162a2f2df92d1aa651bd9df
Author: Anand Inguva <34158215+ananding...@users.noreply.github.com>
AuthorDate: Wed Jul 26 22:45:24 2023 -0400

    Fetch buffer and add it to pcoll_buffer to avoid element duplication when 
buffer is None (#27676)
    
    * Fetch buffer and add it to pcoll_buffer to avoid duplication
    
    * Add buffer_id to buffers_to_clean
    
    * Move test
---
 CHANGES.md                                                     |  2 +-
 .../apache_beam/runners/portability/fn_api_runner/fn_runner.py | 10 +++++++++-
 .../runners/portability/fn_api_runner/fn_runner_test.py        |  9 +++++++++
 3 files changed, 19 insertions(+), 2 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index ec1c112ff4d..231a41774f0 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -79,7 +79,7 @@
 
 ## Bugfixes
 
-* Fixed DirectRunner bug in Python SDK where GroupByKey gets empty PCollection 
and fails when pipeline option `direct_num_workers!=1`. 
([#27373](https://github.com/apache/beam/pull/27373))
+* Fixed DirectRunner bug in Python SDK where GroupByKey gets empty PCollection 
and fails when pipeline option 
`direct_num_workers!=1`.([#27373](https://github.com/apache/beam/pull/27373))
 * Fixed BigQuery I/O bug when estimating size on queries that utilize 
row-level security ([#27474](https://github.com/apache/beam/pull/27474))
 * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
 
diff --git 
a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
index 8d957068d08..098e16933b7 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
@@ -825,7 +825,8 @@ class FnApiRunner(runner.PipelineRunner):
 
     buffers_to_clean = set()
     known_consumers = set()
-    for _, buffer_id in bundle_context_manager.stage_data_outputs.items():
+    for transform_id, buffer_id in (
+      bundle_context_manager.stage_data_outputs.items()):
       for (consuming_stage_name, consuming_transform) in \
           runner_execution_context.buffer_id_to_consumer_pairs.get(buffer_id,
                                                                    []):
@@ -841,6 +842,13 @@ class FnApiRunner(runner.PipelineRunner):
           runner_execution_context.pcoll_buffers[buffer_id] = buffer.copy()
           buffer = runner_execution_context.pcoll_buffers[buffer_id]
 
+        # empty buffer. Add it to the pcoll_buffer to avoid element
+        # duplication.
+        if buffer_id not in runner_execution_context.pcoll_buffers:
+          buffer = bundle_context_manager.get_buffer(buffer_id, transform_id)
+          runner_execution_context.pcoll_buffers[buffer_id] = buffer
+          buffers_to_clean.add(buffer_id)
+
         # If the buffer has already been added to be consumed by
         # (stage, transform), then we don't need to add it again. This case
         # can happen whenever we flatten the same PCollection with itself.
diff --git 
a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index ed09bb8f223..40887c350dd 100644
--- 
a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++ 
b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -1375,6 +1375,15 @@ class FnApiRunnerTest(unittest.TestCase):
   def test_pack_combiners(self):
     self._test_pack_combiners(assert_using_counter_names=True)
 
+  def test_group_by_key_with_empty_pcoll_elements(self):
+    with self.create_pipeline() as p:
+      res = (
+          p
+          | beam.Create([('test_key', 'test_value')])
+          | beam.Filter(lambda x: False)
+          | beam.GroupByKey())
+      assert_that(res, equal_to([]))
+
 
 # These tests are kept in a separate group so that they are
 # not ran in the FnApiRunnerTestWithBundleRepeat which repeats

Reply via email to