The direct runner blows up memory usage as well. Currently, the size limitations and performance characteristics of BagState are a property of the runner. I don't know if this kind of thing can or should be lifted to the model itself, though it might make sense to provide expectations and recommendations to be a "performant" runner.
I think the comment about regressing is specific to the Dataflow runner, in that it might get better, but we won't make it worse. On Tue, Feb 28, 2017 at 1:32 PM, Beau Fabry <[email protected]> wrote: > Could I get some clarification on "But the property of being able to > exceed memory won't be regressed, whatever strategy is adopted.". It seems > like both Dataflow batch mode, and Flink all modes currently could blow > memory usage if you attempt to read a large BagState, so should I take that > to mean that a design that assumes a BagState can be arbitrarily large is > inappropriate? > > On Tue, Feb 28, 2017 at 1:21 PM Kenneth Knowles <[email protected]> wrote: > >> Yes, currently the Dataflow runner does read BagState lazily in streaming >> mode. In batch mode it uses in-memory state at the moment. >> >> But I want to re-emphasize that the APIs are designed to allow many >> possible implementations and styles of optimization, so it can (and likely >> will) change over time. But the property of being able to exceed memory >> won't be regressed, whatever strategy is adopted. >> >> Kenn >> >> On Tue, Feb 28, 2017 at 12:39 PM, Aljoscha Krettek <[email protected]> >> wrote: >> >> Just to confirm: yes, the Flink Runner does (currently) not read lazily. >> >> On Tue, 28 Feb 2017 at 21:26 Beau Fabry <[email protected]> wrote: >> >> Ok, this might be out of the scope of the mailing list, but do you know >> if the dataflow runner specifically reads bagstate lazily? Our read of the >> flink runner code is that it does not. >> >> On Tue., 28 Feb. 2017, 10:27 am Robert Bradshaw, <[email protected]> >> wrote: >> >> Yes, the intent is that runners can (should?) support streaming of >> BagState reads. Of course reading a large state every time could still be >> expensive... (One of the key features of BagState is that it supports blind >> writes, i.e. you can append cheaply without reading). >> >> >> On Tue, Feb 28, 2017 at 10:08 AM, Beau Fabry <[email protected]> wrote: >> >> Awesome, thanks for the answers. I have one more: Is the Iterable >> returned by BagState backed by some sort of lazy-loading from disk, like >> the Iterable result in a GroupByKey, as mentioned here >> http://stackoverflow.com/questions/34136919/cogbkresult-has-more-than- >> 10000-elements-reiteration-which-may-be-slow-is-requ/34140487#34140487 ? >> >> We're joining some infrequently changing small amount of data (per-key, >> but still far too big globally to fit in memory as a sideinput) to a very >> large frequently updated stream and trying to figure out how to reduce our >> redundant outputs. >> >> Cheers, >> Beau >> >> On Tue, Feb 21, 2017 at 8:04 PM Kenneth Knowles <[email protected]> wrote: >> >> Hi Beau, >> >> I've answered inline. >> >> On Tue, Feb 21, 2017 at 5:09 PM, Beau Fabry <[email protected]> wrote: >> >> In times when communicating between outputs would be an optimisation, and >> reducing inputs in parallel would be an optimisation, how do you make the >> call between using a stateful dofn or a Combine.perKey? >> >> >> (Standard optimization caveat: you'd make the call by measuring them both) >> >> Usually you will be forced to make this decision not based on >> optimization, but based on whether your use case can be expressed naturally >> as one or the other. >> >> The key difference is just the one I talk about in the blog post, the >> associative `merge` operation. This enables nice optimizations so if your >> use case fits nicely as a CombineFn then you should probably use it. There >> are exceptions, like fine-grained access to just a piece of your >> accumulator state, but the most common way to combine things is with >> Combine :-) >> >> You can always follow a Combine.perKey with a stateful ParDo that >> communicates between outputs, if that also presents a major savings. >> >> Is there a case to be made that CombineFn could also get access to state >> within extractOutput? It seems like the only benefit to a CombineFn now is >> that the merge and addInput steps can run on multiple workers, is there a >> rule of thumb to know if we have enough data per key that that is >> significant? >> >> >> This is a big benefit a lot of the time, especially in large batch >> executions. Without this optimization opportunity, a Combine.perKey would >> execute by first doing a GroupByKey - shuffling all of your data - and then >> applying your CombineFn to each KV<K, Iterable<V>>. But instead, this can >> be optimized so that the combine happens before the shuffle, which can >> reduce the shuffled data to a single accumulator per key. This optimization >> is general enough that my description applies to all runners, though it has >> lesser benefits in streaming executions where there is not as much data per >> key+window+triggering. >> >> Kenn >> >> >> >> Cheers, >> Beau >> >> On Wed, Feb 15, 2017 at 8:53 AM Ismaël Mejía <[email protected]> wrote: >> >> Great post, I like the use of the previous figure style with geometric >> forms and colors, as well as the table analogy that really helps to >> understand the concepts. I am still digesting some of the consequences of >> the State API, in particular the implications of using state that you >> mention at the end. Really good to discuss those also as part of the post. >> >> I found some small typos and formatting issues that I addressed here. >> https://github.com/apache/beam-site/pull/156 >> >> Thanks for writing, >> Ismaël >> >> >> On Tue, Feb 14, 2017 at 11:50 AM, Jean-Baptiste Onofré <[email protected]> >> wrote: >> >> Hey Ken >> >> Just take a quick look and it's a great post ! >> >> Thanks >> Regards >> JB >> On Feb 13, 2017, at 18:44, Kenneth Knowles <[email protected]> wrote: >> >> Hi all, >> >> I've just published a blog post about Beam's new stateful processing >> capabilities: >> >> https://beam.apache.org/blog/2017/02/13/stateful-processing.html >> >> The blog post covers stateful processing from a few angles: how it >> works, how it fits into the Beam model, what you might use it for, and >> finally some examples of stateful Beam code. >> >> I'd love for you to take a look and see how this feature might apply to >> your use of Beam. And, of course, I'd also love to hear from you about it. >> >> Kenn >> >> >> >> >>
