jrmccluskey commented on code in PR #36596:
URL: https://github.com/apache/beam/pull/36596#discussion_r2491163457
##########
sdks/python/apache_beam/transforms/async_dofn.py:
##########
@@ -256,10 +256,12 @@ def schedule_item(self, element, ignore_buffer=False,
*args, **kwargs):
total_sleep += sleep_time
sleep(sleep_time)
- def next_time_to_fire(self):
+ def next_time_to_fire(self, key):
+ random.seed(key)
return (
floor((time() + self._timer_frequency) / self._timer_frequency) *
- self._timer_frequency)
+ self._timer_frequency) + (
+ random.random() * self._timer_frequency)
Review Comment:
Ah I see, makes sense. There may be a way to queue up per-key firing times,
but that's a more substantial piece of work since Beam timers don't work like
that themselves. The compute here is negligible so I'm not particularly worried
about it, if we wanted to use some memory instead we could pick an offset
per-key and store it, that would eliminate the spikes but keep a consistent
cadence of each key firing at the desired frequency when in steady state
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]