Could I get some clarification on "But the property of being able to exceed
memory won't be regressed, whatever strategy is adopted.". It seems like
both Dataflow batch mode, and Flink all modes currently could blow memory
usage if you attempt to read a large BagState, so should I take that to
mean that a design that assumes a BagState can be arbitrarily large is
inappropriate?

On Tue, Feb 28, 2017 at 1:21 PM Kenneth Knowles <k...@google.com> wrote:

> Yes, currently the Dataflow runner does read BagState lazily in streaming
> mode. In batch mode it uses in-memory state at the moment.
>
> But I want to re-emphasize that the APIs are designed to allow many
> possible implementations and styles of optimization, so it can (and likely
> will) change over time. But the property of being able to exceed memory
> won't be regressed, whatever strategy is adopted.
>
> Kenn
>
> On Tue, Feb 28, 2017 at 12:39 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> Just to confirm: yes, the Flink Runner does (currently) not read lazily.
>
> On Tue, 28 Feb 2017 at 21:26 Beau Fabry <bfa...@zendesk.com> wrote:
>
> Ok, this might be out of the scope of the mailing list, but do you know if
> the dataflow runner specifically reads bagstate lazily? Our read of the
> flink runner code is that it does not.
>
> On Tue., 28 Feb. 2017, 10:27 am Robert Bradshaw, <rober...@google.com>
> wrote:
>
> Yes, the intent is that runners can (should?) support streaming of
> BagState reads. Of course reading a large state every time could still be
> expensive... (One of the key features of BagState is that it supports blind
> writes, i.e. you can append cheaply without reading).
>
>
> On Tue, Feb 28, 2017 at 10:08 AM, Beau Fabry <bfa...@zendesk.com> wrote:
>
> Awesome, thanks for the answers. I have one more: Is the Iterable returned
> by BagState backed by some sort of lazy-loading from disk, like the
> Iterable result in a GroupByKey, as mentioned here
> http://stackoverflow.com/questions/34136919/cogbkresult-has-more-than-10000-elements-reiteration-which-may-be-slow-is-requ/34140487#34140487
>  ?
>
> We're joining some infrequently changing small amount of data (per-key,
> but still far too big globally to fit in memory as a sideinput) to a very
> large frequently updated stream and trying to figure out how to reduce our
> redundant outputs.
>
> Cheers,
> Beau
>
> On Tue, Feb 21, 2017 at 8:04 PM Kenneth Knowles <k...@google.com> wrote:
>
> Hi Beau,
>
> I've answered inline.
>
> On Tue, Feb 21, 2017 at 5:09 PM, Beau Fabry <bfa...@zendesk.com> wrote:
>
> In times when communicating between outputs would be an optimisation, and
> reducing inputs in parallel would be an optimisation, how do you make the
> call between using a stateful dofn or a Combine.perKey?
>
>
> (Standard optimization caveat: you'd make the call by measuring them both)
>
> Usually you will be forced to make this decision not based on
> optimization, but based on whether your use case can be expressed naturally
> as one or the other.
>
> The key difference is just the one I talk about in the blog post, the
> associative `merge` operation. This enables nice optimizations so if your
> use case fits nicely as a CombineFn then you should probably use it. There
> are exceptions, like fine-grained access to just a piece of your
> accumulator state, but the most common way to combine things is with
> Combine :-)
>
> You can always follow a Combine.perKey with a stateful ParDo that
> communicates between outputs, if that also presents a major savings.
>
> Is there a case to be made that CombineFn could also get access to state
> within extractOutput? It seems like the only benefit to a CombineFn now is
> that the merge and addInput steps can run on multiple workers, is there a
> rule of thumb to know if we have enough data per key that that is
> significant?
>
>
> This is a big benefit a lot of the time, especially in large batch
> executions. Without this optimization opportunity, a Combine.perKey would
> execute by first doing a GroupByKey - shuffling all of your data - and then
> applying your CombineFn to each KV<K, Iterable<V>>. But instead, this can
> be optimized so that the combine happens before the shuffle, which can
> reduce the shuffled data to a single accumulator per key. This optimization
> is general enough that my description applies to all runners, though it has
> lesser benefits in streaming executions where there is not as much data per
> key+window+triggering.
>
> Kenn
>
>
>
> Cheers,
> Beau
>
> On Wed, Feb 15, 2017 at 8:53 AM Ismaël Mejía <ieme...@gmail.com> wrote:
>
> Great post, I like the use of the previous figure style with geometric
> forms and colors, as well as the table analogy that really helps to
> understand the concepts. I am still digesting some of the consequences of
> the State API, in particular the implications of using state that you
> mention at the end. Really good to discuss those also as part of the post.
>
> I found some small typos and formatting issues that I addressed here.
> https://github.com/apache/beam-site/pull/156
>
> Thanks for writing,
> Ismaël
>
>
> On Tue, Feb 14, 2017 at 11:50 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
>
> Hey Ken
>
> Just take a quick look and it's a great post !
>
> Thanks
> Regards
> JB
> On Feb 13, 2017, at 18:44, Kenneth Knowles <k...@google.com> wrote:
>
> Hi all,
>
> I've just published a blog post about Beam's new stateful processing
> capabilities:
>
>     https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>
> The blog post covers stateful processing from a few angles: how it works,
> how it fits into the Beam model, what you might use it for, and finally
> some examples of stateful Beam code.
>
> I'd love for you to take a look and see how this feature might apply to
> your use of Beam. And, of course, I'd also love to hear from you about it.
>
> Kenn
>
>
>
>
>

Reply via email to