This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6029a57a4d7 Move CombineValues override to non portable overrides 
(#25106)
6029a57a4d7 is described below

commit 6029a57a4d7399ce5942f000b9828d83588ff5db
Author: Danny McCormick <dannymccorm...@google.com>
AuthorDate: Mon Jan 23 15:41:37 2023 -0500

    Move CombineValues override to non portable overrides (#25106)
    
    * Move CombineValues override to non portable overrides
    
    * Fix extraneously added tab
    
    * Create combinevalues_it_test.py
    
    * License
    
    * Add backslashes
    
    * Format
    
    * Lint
    
    * Fix imports
    
    * Validates runner test
    
    * Spacing
    
    * Update sdks/python/apache_beam/transforms/combiners_test.py
    
    Co-authored-by: Lukasz Cwik <lc...@google.com>
    
    Co-authored-by: Lukasz Cwik <lc...@google.com>
---
 .../apache_beam/runners/dataflow/dataflow_runner.py    |  2 +-
 sdks/python/apache_beam/transforms/combiners_test.py   | 18 ++++++++++++++++++
 2 files changed, 19 insertions(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index a28c971fb0d..4ad1b6e6792 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -111,13 +111,13 @@ class DataflowRunner(PipelineRunner):
   # These overrides should be applied before the proto representation of the
   # graph is created.
   _PTRANSFORM_OVERRIDES = [
-      CombineValuesPTransformOverride(),
       NativeReadPTransformOverride(),
   ]  # type: List[PTransformOverride]
 
   # These overrides should be applied after the proto representation of the
   # graph is created.
   _NON_PORTABLE_PTRANSFORM_OVERRIDES = [
+      CombineValuesPTransformOverride(),
       CreatePTransformOverride(),
       ReadPTransformOverride(),
   ]  # type: List[PTransformOverride]
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py 
b/sdks/python/apache_beam/transforms/combiners_test.py
index 5a15b5989fa..34fb463d00a 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -873,6 +873,24 @@ class LatestCombineFnTest(unittest.TestCase):
         _ = pc | beam.CombineGlobally(self.fn)
 
 
+@pytest.mark.it_validatesrunner
+class CombineValuesTest(unittest.TestCase):
+  def test_gbk_immediately_followed_by_combine(self):
+    def merge(vals):
+      return "".join(vals)
+
+    with TestPipeline() as p:
+      result = (
+          p \
+          | Create([("key1", "foo"), ("key2", "bar"), ("key1", "foo")],
+                    reshuffle=False) \
+          | beam.GroupByKey() \
+          | beam.CombineValues(merge) \
+          | beam.MapTuple(lambda k, v: '{}: {}'.format(k, v)))
+
+      assert_that(result, equal_to(['key1: foofoo', 'key2: bar']))
+
+
 #
 # Test cases for streaming.
 #

Reply via email to