jrmccluskey commented on code in PR #36596:
URL: https://github.com/apache/beam/pull/36596#discussion_r2473272328
##########
sdks/python/apache_beam/transforms/async_dofn.py:
##########
@@ -238,9 +237,10 @@ def schedule_item(self, element, ignore_buffer=False,
*args, **kwargs):
**kwargs: keyword arguments that the wrapped dofn requires.
"""
done = False
- sleep_time = 1
+ sleep_time = 0.01
total_sleep = 0
- while not done:
+ timeout = 1
Review Comment:
the timeout duration could be configurable in `__init__()`
##########
sdks/python/apache_beam/transforms/async_dofn.py:
##########
@@ -256,10 +256,12 @@ def schedule_item(self, element, ignore_buffer=False,
*args, **kwargs):
total_sleep += sleep_time
sleep(sleep_time)
- def next_time_to_fire(self):
+ def next_time_to_fire(self, key):
+ random.seed(key)
return (
floor((time() + self._timer_frequency) / self._timer_frequency) *
- self._timer_frequency)
+ self._timer_frequency) + (
+ random.random() * self._timer_frequency)
Review Comment:
I feel like doing all of the work to find a round increment of
`_timer_frequency` is wasted compute once you add the extra fuzziness of
`random.random() * self._timer_frequency` since you're no longer on a round
increment afterwards
##########
sdks/python/apache_beam/transforms/async_dofn.py:
##########
@@ -55,9 +56,8 @@ class AsyncWrapper(beam.DoFn):
TIMER_SET = ReadModifyWriteStateSpec('timer_set', coders.BooleanCoder())
TO_PROCESS = BagStateSpec(
'to_process',
- coders.TupleCoder([coders.StrUtf8Coder(), coders.StrUtf8Coder()]),
- )
- _timer_frequency = 20
+ coders.TupleCoder(
Review Comment:
This is a backwards incompatible change, since you're swapping to a
different coder
##########
sdks/python/apache_beam/transforms/async_dofn.py:
##########
@@ -103,7 +103,7 @@ def __init__(
self._uuid = uuid.uuid4().hex
self._parallelism = parallelism
self._max_wait_time = max_wait_time
- self._timer_frequency = 20
+ self._timer_frequency = callback_frequency
Review Comment:
As best I can tell, `self._timer_frequency` is used but
`self.timer_frequency_` is not. Is there any reason to have both? Same goes for
all of these duped fields
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]