I like the idea of pushing back to the source much better than
unboundedly buffering things in state. I was trying to think of how to
just slow things down and one problem is that while we can easily
control the number of keys, it's much harder to control (or even
detect) the number of parallel threads at any given point in time (for
which keys is simply an upper bound, especially in batch).

On Wed, Feb 21, 2024 at 9:28 AM Reuven Lax <re...@google.com> wrote:
>
> Agreed, that event-time throttling doesn't make sense here. In theory 
> processing-time timers have no SLA - i.e. their firing might be delayed - so 
> batch runners aren't violating the model by firing them all at the end; 
> however it does make processing time timers less useful in batch, as we see 
> here.
>
> Personally, I'm not sure I would use state and timers to implement this, and 
> I definitely wouldn't create this many keys. A couple of reasons for this:
>   1. If a pipeline is receiving input faster than the throttle rate, the 
> proposed technique would shift all those elements into the DoFn's state which 
> will keep growing indefinitely. Generally we would prefer to leave that 
> backlog in the source instead of copying it into DoFn state.
>   2. In my experience with throttling, having too much parallelism is 
> problematic. The issue is that there is some error involved whenever you 
> throttle, and this error can accumulate across many shards (and when I've 
> done this sort of thing before, I found that the error was often biased in 
> one direction). If targeting 100,000 records/sec, this  approach (if I 
> understand it correctly) would create 100,000 shards and throttle them each 
> to one element/sec. I doubt this will actually result in anything close to 
> desired throttling.
>   3. Very commonly, the request is to throttle based on bytes/sec, not 
> events/sec. Anything we build should be easily extensible to bytes/sec.
>
> What I would suggest (and what Beam users have often done in the past) would 
> be to bucket the PCollection into N buckets where N is generally smallish 
> (100 buckets, 1000 buckets, depending on the expected throughput); runners 
> that support autosharding (such as Dataflow) can automatically choose N. Each 
> shard then throttles its output to rate/N. Keeping N no larger than necessary 
> minimizes the error introduced into throttling.
>
> We also don't necessarily need state/timers here - each shard is processed on 
> a single thread, so those threads can simply throttle calls to 
> OutputReceiver.output. This way if the pipeline is exceeding the threshold, 
> backpressure will tend to simply leave excess data in the source. This also 
> is a simpler design than the proposed one.
>
> A more sophisticated design might combine elements of both - buffering a 
> bounded amount of data in state when the threshold is exceeded, but severely 
> limiting the state size. However I wouldn't start here - we would want to 
> build the simpler implementation first and see how it performs.
>
> On Wed, Feb 21, 2024 at 8:53 AM Robert Bradshaw via dev <dev@beam.apache.org> 
> wrote:
>>
>> On Wed, Feb 21, 2024 at 12:48 AM Jan Lukavský <je...@seznam.cz> wrote:
>> >
>> > Hi,
>> >
>> > I have left a note regarding the proposed splitting of batch and
>> > streaming expansion of this transform. In general, a need for such split
>> > triggers doubts in me. This signals that either
>> >
>> >   a) the transform does something is should not, or
>> >
>> >   b) Beam model is not complete in terms of being "unified"
>> >
>> > The problem that is described in the document is that in the batch case
>> > timers are not fired appropriately.
>>
>> +1. The underlying flaw is that processing time timers are not handled
>> correctly in batch, but should be (even if it means keeping workers
>> idle?). We should fix this.
>>
>> > This is actually on of the
>> > motivations that led to introduction of @RequiresTimeSortedInput
>> > annotation and, though mentioned years ago as a question, I do not
>> > remember what arguments were used against enforcing sorting inputs by
>> > timestamp in the batch stateful DoFn as a requirement in the model. That
>> > would enable the appropriate firing of timers while preserving the batch
>> > invariant which is there are no late data allowed. IIRC there are
>> > runners that do this sorting by default (at least the sorting, not sure
>> > about the timers, but once inputs are sorted, firing timers is simple).
>> >
>> > A different question is if this particular transform should maybe fire
>> > not by event time, but rather processing time?
>>
>> Yeah, I was reading all of these as processing time. Throttling by
>> event time doesn't make much sense.
>>
>> > On 2/21/24 03:00, Robert Burke wrote:
>> > > Thanks for the design Damon! And thanks for collaborating with me on 
>> > > getting a high level textual description of the key implementation idea 
>> > > down in writing. I think the solution is pretty elegant.
>> > >
>> > > I do have concerns about how different Runners might handle 
>> > > ProcessContinuations for the Bounded Input case. I know Dataflow 
>> > > famously has two different execution modes under the hood, but I agree 
>> > > with the principle that ProcessContinuation.Resume should largely be in 
>> > > line with the expected delay, though it's by no means guaranteed AFAIK.
>> > >
>> > > We should also ensure this is linked from 
>> > > https://s.apache.org/beam-design-docs if not already.
>> > >
>> > > Robert Burke
>> > > Beam Go Busybody
>> > >
>> > > On 2024/02/20 14:00:00 Damon Douglas wrote:
>> > >> Hello Everyone,
>> > >>
>> > >> The following describes a Throttle PTransform that holds element 
>> > >> throughput
>> > >> to minimize downstream API overusage. Thank you for reading and your
>> > >> valuable input.
>> > >>
>> > >> https://s.apache.org/beam-throttle-transform
>> > >>
>> > >> Best,
>> > >>
>> > >> Damon
>> > >>

Reply via email to