This is an automated email from the ASF dual-hosted git repository.

udim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a4abc91  [BEAM-12434] Implement side_input for num_shards in iobase 
(#14916)
a4abc91 is described below

commit a4abc91436eadf78abda304664a3e0f64019ea61
Author: hoshimura <johan.ster...@gmail.com>
AuthorDate: Fri Jun 4 19:19:20 2021 +0200

    [BEAM-12434] Implement side_input for num_shards in iobase (#14916)
    
    Co-authored-by: Udi Meiri <u...@users.noreply.github.com>
    Co-authored-by: Johan Sternby <joha...@axis.com>
---
 sdks/python/apache_beam/io/iobase.py | 16 ++++++----------
 1 file changed, 6 insertions(+), 10 deletions(-)

diff --git a/sdks/python/apache_beam/io/iobase.py 
b/sdks/python/apache_beam/io/iobase.py
index 71d8037..5d8e5df 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -1125,7 +1125,7 @@ class WriteImpl(ptransform.PTransform):
       if min_shards == 1:
         keyed_pcoll = pcoll | core.Map(lambda x: (None, x))
       else:
-        keyed_pcoll = pcoll | core.ParDo(_RoundRobinKeyFn(min_shards))
+        keyed_pcoll = pcoll | core.ParDo(_RoundRobinKeyFn(), count=min_shards)
       write_result_coll = (
           keyed_pcoll
           | core.WindowInto(window.GlobalWindows())
@@ -1226,17 +1226,13 @@ def _finalize_write(
 
 
 class _RoundRobinKeyFn(core.DoFn):
-  def __init__(self, count):
-    # type: (int) -> None
-    self.count = count
-
   def start_bundle(self):
-    self.counter = random.randint(0, self.count - 1)
+    self.counter = None
 
-  def process(self, element):
-    self.counter += 1
-    if self.counter >= self.count:
-      self.counter -= self.count
+  def process(self, element, count):
+    if self.counter is None:
+      self.counter = random.randrange(0, count)
+    self.counter = (1 + self.counter) % count
     yield self.counter, element
 
 

Reply via email to