Do individual stages of a beam job exhibit backpressure to the consumer though? I would think buffering elements with Beam's BagState might lead to OOM errors on the workers if the consumerIO continues to feed in data. Or does something else happen?
--Vincent On Thu, Jun 17, 2021 at 11:42 AM Luke Cwik <[email protected]> wrote: > If the service returns sensible throttling errors you could use a > StatefulDoFn and buffer elements that error out due to throttling from the > service instead of failing the bundle and schedule a timer to replay them. > This will effectively max out the service as long as there is more data > then the service can handle which doesn't work too well if the service. > > > On Fri, Apr 16, 2021 at 6:20 PM Daniel Thevessen <[email protected]> > wrote: > >> Thanks for the quick response. >> Querying the Dataflow API seems like something that could break easily, >> but I can go with that if it turns out to be easier. >> >> The Splittable DoFn way sounds interesting, but I'm not very familiar >> with that so I have some questions around it: >> Splits seem to operate on offsets within a single element. Does that mean >> that I'd set a fixed shard number x, and then I'd need to first group my >> PCollection of single elements into a PCollection of lists, each size x? >> And are the subsequent writes also limited to x workers, meaning that >> splits have the same issue as with a GroupByKey? >> I see the UnboundedCountingSource gets a `desiredNumSplits` parameter. >> I'm assuming there is nothing similar that would allow a Splittable DoFn to >> simply figure out the number of workers even if it changes? That's probably >> very hacky anyway. >> If the above solution with fixed-size lists makes sense and will >> redistribute the writes I'm already happy, I don't necessarily need to have >> the throttling step dynamically match autoscaling. >> >> On Thu, Apr 15, 2021 at 4:20 PM Pablo Estrada <[email protected]> wrote: >> >>> You could implement a Splittable DoFn that generates a limited number of >>> splits. We do something like this for >>> GenerateSequence.from(X).withRate(...) via UnboundedCountingSource[1]. It >>> keeps track of its local EPS, and generates new splits if more EPSs are >>> wanted. This should help you scale up to the maximum of EPS that you want, >>> and autoscaling will only produce the appropriate number of workers for >>> that number of splits. >>> >>> - The only issue may be that you can't "scale down" if you find that >>> some of your splits have a very low throughput, because two splits can't be >>> merged back together (does that make sense?) - but Dataflow should be able >>> to scale down and schedule multiple splits in a single worker if that's the >>> case. >>> >>> The UnboundedCountingSource is a Source, so it can't have an input (and >>> it's deprecated), but you could write a SplittableDoFn that has the same >>> behavior. Do you think this could work? >>> >>> >>> [1] >>> https://github.com/apache/beam/blob/8c9605f224115507912cf72e02d3fa94905548ec/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java#L334-L348 >>> >>> On Thu, Apr 15, 2021 at 4:11 PM Evan Galpin <[email protected]> >>> wrote: >>> >>>> Could you possibly use a side input with fixed interval triggering[1] >>>> to query the Dataflow API to get the most recent log statement of scaling >>>> as suggested here[2]? >>>> >>>> [1] >>>> https://beam.apache.org/documentation/patterns/side-inputs/ >>>> [2] >>>> https://stackoverflow.com/a/54406878/6432284 >>>> >>>> On Thu, Apr 15, 2021 at 18:14 Daniel Thevessen <[email protected]> >>>> wrote: >>>> >>>>> Hi folks, >>>>> >>>>> I've been working on a custom PTransform that makes requests to >>>>> another service, and would like to add a rate limiting feature there. The >>>>> fundamental issue that I'm running into here is that I need a decent >>>>> heuristic to estimate the worker count, so that each worker can >>>>> independently set a limit which globally comes out to the right value. All >>>>> of this is easy if I know how many machines I have, but I'd like to use >>>>> Dataflow's autoscaling, which would easily break any pre-configured value. >>>>> I have seen two main approaches for rate limiting, both for a >>>>> configurable variable x: >>>>> >>>>> - Simply assume worker count is x, then divide by x to figure out >>>>> the "local" limit. The issue I have here is that if we assume x is >>>>> 500, but >>>>> it is actually 50, I'm now paying for 50 nodes to throttle 10 times as >>>>> much >>>>> as necessary. I know the pipeline options have a reference to the >>>>> runner, >>>>> is it possible to get an approximate current worker count from that at >>>>> bundle start (*if* runner is DataflowRunner)? >>>>> - Add another PTransform in front of the API requests, which >>>>> groups by x number of keys, throttles, and keeps forwarding elements >>>>> with >>>>> an instant trigger. I initially really liked this solution because >>>>> even if >>>>> x is misconfigured, I will have at most x workers running and throttle >>>>> appropriately. However, I noticed that for batch pipelines, this >>>>> effectively also caps the API request stage at x workers. If I throw >>>>> in a >>>>> `Reshuffle`, there is another GroupByKey (-> another stage), and >>>>> nothing >>>>> gets done until every element has passed through the throttler. >>>>> >>>>> Has anyone here tried to figure out rate limiting with Beam before, >>>>> and perhaps run into similar issues? I would love to know if there is a >>>>> preferred solution to this type of problem. >>>>> I know sharing state like that runs a little counter to the Beam >>>>> pipeline paradigm, but really all I need is an approximate worker count >>>>> with few guarantees. >>>>> >>>>> Cheers, >>>>> Daniel >>>>> >>>> >> >> -- >> >> Daniel Thevessen | Site Reliability Engineer >> >> Firestore SRE >> San Francisco, CA, USA | +1 (415) 373-7762 <(415)%20373-7762> >> > ~Vincent
