This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f582e564fe Add key param support for helper Combiners (#25895)
9f582e564fe is described below

commit 9f582e564fededdd4506157a21bef66ada303529
Author: harrisonlimh <97203667+harrisonl...@users.noreply.github.com>
AuthorDate: Wed Mar 29 14:08:31 2023 -0700

    Add key param support for helper Combiners (#25895)
    
    Co-authored-by: tvalentyn <tvalen...@users.noreply.github.com>
---
 CHANGES.md                                           |  1 +
 sdks/python/apache_beam/transforms/combiners.py      | 20 ++++++++++----------
 sdks/python/apache_beam/transforms/combiners_test.py | 14 ++++++++++++++
 3 files changed, 25 insertions(+), 10 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 0f54d233ad6..5e6c99f4adb 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -77,6 +77,7 @@
 
 * If a main session fails to load, the pipeline will now fail at worker 
startup. ([#25401](https://github.com/apache/beam/issues/25401)).
 * Python pipeline options will now ignore unparsed command line flags prefixed 
with a single dash. ([#25943](https://github.com/apache/beam/issues/25943)).
+* The SmallestPerKey combiner now requires keyword-only arguments for 
specifying optional parameters, such as `key` and `reverse`. 
([#25888](https://github.com/apache/beam/issues/25888)).
 
 ## Deprecations
 
diff --git a/sdks/python/apache_beam/transforms/combiners.py 
b/sdks/python/apache_beam/transforms/combiners.py
index 7afa4e46e75..0e5f01196e6 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -293,33 +293,33 @@ class Top(object):
 
   @staticmethod
   @ptransform.ptransform_fn
-  def Largest(pcoll, n, has_defaults=True):
+  def Largest(pcoll, n, has_defaults=True, key=None):
     """Obtain a list of the greatest N elements in a PCollection."""
     if has_defaults:
-      return pcoll | Top.Of(n)
+      return pcoll | Top.Of(n, key)
     else:
-      return pcoll | Top.Of(n).without_defaults()
+      return pcoll | Top.Of(n, key).without_defaults()
 
   @staticmethod
   @ptransform.ptransform_fn
-  def Smallest(pcoll, n, has_defaults=True):
+  def Smallest(pcoll, n, has_defaults=True, key=None):
     """Obtain a list of the least N elements in a PCollection."""
     if has_defaults:
-      return pcoll | Top.Of(n, reverse=True)
+      return pcoll | Top.Of(n, key, reverse=True)
     else:
-      return pcoll | Top.Of(n, reverse=True).without_defaults()
+      return pcoll | Top.Of(n, key, reverse=True).without_defaults()
 
   @staticmethod
   @ptransform.ptransform_fn
-  def LargestPerKey(pcoll, n):
+  def LargestPerKey(pcoll, n, key=None):
     """Identifies the N greatest elements associated with each key."""
-    return pcoll | Top.PerKey(n)
+    return pcoll | Top.PerKey(n, key)
 
   @staticmethod
   @ptransform.ptransform_fn
-  def SmallestPerKey(pcoll, n, reverse=True):
+  def SmallestPerKey(pcoll, n, *, key=None, reverse=None):
     """Identifies the N least elements associated with each key."""
-    return pcoll | Top.PerKey(n, reverse=True)
+    return pcoll | Top.PerKey(n, key, reverse=True)
 
 
 @with_input_types(T)
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py 
b/sdks/python/apache_beam/transforms/combiners_test.py
index 34fb463d00a..385b3332e0c 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -191,6 +191,20 @@ class CombineTest(unittest.TestCase):
                      | combine.Top.Of(3, key=len, reverse=True),
                      [['c', 'aa', 'bbb']])
 
+    self.assertEqual(['xc', 'zb', 'yd', 'wa']
+                     | combine.Top.Largest(3, key=lambda x: x[-1]),
+                     [['yd', 'xc', 'zb']])
+    self.assertEqual(['xc', 'zb', 'yd', 'wa']
+                     | combine.Top.Smallest(3, key=lambda x: x[-1]),
+                     [['wa', 'zb', 'xc']])
+
+    self.assertEqual([('a', x) for x in [1, 2, 3, 4, 1, 1]]
+                     | combine.Top.LargestPerKey(3, key=lambda x: -x),
+                     [('a', [1, 1, 1])])
+    self.assertEqual([('a', x) for x in [1, 2, 3, 4, 1, 1]]
+                     | combine.Top.SmallestPerKey(3, key=lambda x: -x),
+                     [('a', [4, 3, 2])])
+
   def test_sharded_top_combine_fn(self):
     def test_combine_fn(combine_fn, shards, expected):
       accumulators = [

Reply via email to