Do individual stages of a beam job exhibit backpressure to the consumer
though?  I would think buffering elements with Beam's BagState might lead
to OOM errors on the workers if the consumerIO continues to feed in data.
Or does something else happen?

--Vincent


On Thu, Jun 17, 2021 at 11:42 AM Luke Cwik <[email protected]> wrote:

> If the service returns sensible throttling errors you could use a
> StatefulDoFn and buffer elements that error out due to throttling from the
> service instead of failing the bundle and schedule a timer to replay them.
> This will effectively max out the service as long as there is more data
> then the service can handle which doesn't work too well if the service.
>
>
> On Fri, Apr 16, 2021 at 6:20 PM Daniel Thevessen <[email protected]>
> wrote:
>
>> Thanks for the quick response.
>> Querying the Dataflow API seems like something that could break easily,
>> but I can go with that if it turns out to be easier.
>>
>> The Splittable DoFn way sounds interesting, but I'm not very familiar
>> with that so I have some questions around it:
>> Splits seem to operate on offsets within a single element. Does that mean
>> that I'd set a fixed shard number x, and then I'd need to first group my
>> PCollection of single elements into a PCollection of lists, each size x?
>> And are the subsequent writes also limited to x workers, meaning that
>> splits have the same issue as with a GroupByKey?
>> I see the UnboundedCountingSource gets a `desiredNumSplits` parameter.
>> I'm assuming there is nothing similar that would allow a Splittable DoFn to
>> simply figure out the number of workers even if it changes? That's probably
>> very hacky anyway.
>> If the above solution with fixed-size lists makes sense and will
>> redistribute the writes I'm already happy, I don't necessarily need to have
>> the throttling step dynamically match autoscaling.
>>
>> On Thu, Apr 15, 2021 at 4:20 PM Pablo Estrada <[email protected]> wrote:
>>
>>> You could implement a Splittable DoFn that generates a limited number of
>>> splits. We do something like this for
>>> GenerateSequence.from(X).withRate(...) via UnboundedCountingSource[1]. It
>>> keeps track of its local EPS, and generates new splits if more EPSs are
>>> wanted. This should help you scale up to the maximum of EPS that you want,
>>> and autoscaling will only produce the appropriate number of workers for
>>> that number of splits.
>>>
>>> - The only issue may be that you can't "scale down" if you find that
>>> some of your splits have a very low throughput, because two splits can't be
>>> merged back together (does that make sense?) - but Dataflow should be able
>>> to scale down and schedule multiple splits in a single worker if that's the
>>> case.
>>>
>>> The UnboundedCountingSource is a Source, so it can't have an input (and
>>> it's deprecated), but you could write a SplittableDoFn that has the same
>>> behavior. Do you think this could work?
>>>
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/8c9605f224115507912cf72e02d3fa94905548ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java#L334-L348
>>>
>>> On Thu, Apr 15, 2021 at 4:11 PM Evan Galpin <[email protected]>
>>> wrote:
>>>
>>>> Could you possibly use a side input with fixed interval triggering[1]
>>>> to query the Dataflow API to get the most recent log statement of scaling
>>>> as suggested here[2]?
>>>>
>>>> [1]
>>>> https://beam.apache.org/documentation/patterns/side-inputs/
>>>> [2]
>>>> https://stackoverflow.com/a/54406878/6432284
>>>>
>>>> On Thu, Apr 15, 2021 at 18:14 Daniel Thevessen <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi folks,
>>>>>
>>>>> I've been working on a custom PTransform that makes requests to
>>>>> another service, and would like to add a rate limiting feature there. The
>>>>> fundamental issue that I'm running into here is that I need a decent
>>>>> heuristic to estimate the worker count, so that each worker can
>>>>> independently set a limit which globally comes out to the right value. All
>>>>> of this is easy if I know how many machines I have, but I'd like to use
>>>>> Dataflow's autoscaling, which would easily break any pre-configured value.
>>>>> I have seen two main approaches for rate limiting, both for a
>>>>> configurable variable x:
>>>>>
>>>>>    - Simply assume worker count is x, then divide by x to figure out
>>>>>    the "local" limit. The issue I have here is that if we assume x is 
>>>>> 500, but
>>>>>    it is actually 50, I'm now paying for 50 nodes to throttle 10 times as 
>>>>> much
>>>>>    as necessary. I know the pipeline options have a reference to the 
>>>>> runner,
>>>>>    is it possible to get an approximate current worker count from that at
>>>>>    bundle start (*if* runner is DataflowRunner)?
>>>>>    - Add another PTransform in front of the API requests, which
>>>>>    groups by x number of keys, throttles, and keeps forwarding elements 
>>>>> with
>>>>>    an instant trigger. I initially really liked this solution because 
>>>>> even if
>>>>>    x is misconfigured, I will have at most x workers running and throttle
>>>>>    appropriately. However, I noticed that for batch pipelines, this
>>>>>    effectively also caps the API request stage at x workers. If I throw 
>>>>> in a
>>>>>    `Reshuffle`, there is another GroupByKey (-> another stage), and 
>>>>> nothing
>>>>>    gets done until every element has passed through the throttler.
>>>>>
>>>>> Has anyone here tried to figure out rate limiting with Beam before,
>>>>> and perhaps run into similar issues? I would love to know if there is a
>>>>> preferred solution to this type of problem.
>>>>> I know sharing state like that runs a little counter to the Beam
>>>>> pipeline paradigm, but really all I need is an approximate worker count
>>>>> with few guarantees.
>>>>>
>>>>> Cheers,
>>>>> Daniel
>>>>>
>>>>
>>
>> --
>>
>> Daniel Thevessen  |  Site Reliability Engineer
>>
>> Firestore SRE
>> San Francisco, CA, USA |  +1 (415) 373-7762 <(415)%20373-7762>
>>
>
~Vincent

Reply via email to