I've finally managed to understand, write and run my job using stateful and timely processing. Here: https://gist.github.com/calonso/14b06f4f58faf286cc79181cb46a9b85 you can see the code should someone need inspiration.
Thanks a lot for your help, for encouraging me going that way, for such a great product and the amazing community you're building around it. On Wed, Jan 10, 2018 at 6:11 PM Robert Bradshaw <[email protected]> wrote: > Sounds like you have enough to get started. Feel free to come back > here with more specifics if you can't get it working. > > On Wed, Jan 10, 2018 at 9:09 AM, Carlos Alonso <[email protected]> > wrote: > > Thanks Robert!! > > > > After reading this and the former post about stateful processing > Kenneth's > > suggestions sounds sensible. I'll probably give them a try!! Is there > > anything you would like to advice me before starting? > > > > Thanks! > > > > On Wed, Jan 10, 2018 at 10:13 AM Robert Bradshaw <[email protected]> > > wrote: > >> > >> Unfortunately, the metadata driven trigger is still just an idea, not > >> yet implemented. > >> > >> A good introduction to state and timers can be found at > >> https://beam.apache.org/blog/2017/08/28/timely-processing.html > >> > >> On Wed, Jan 10, 2018 at 1:08 AM, Carlos Alonso <[email protected]> > >> wrote: > >> > Hi Robert, Kenneth. > >> > > >> > Thanks a lot to both of you for your responses!! > >> > > >> > Kenneth, unfortunately I'm not sure we're experienced enough with > Apache > >> > Beam to get anywhere close to your suggestion, but thanks anyway!! > >> > > >> > Robert, your suggestion sounds great to me, could you please provide > any > >> > example on how to use that 'metadata driven' trigger? > >> > > >> > Thanks! > >> > > >> > On Tue, Jan 9, 2018 at 9:11 PM Kenneth Knowles <[email protected]> > wrote: > >> >> > >> >> Often, when you need or want more control than triggers provide, such > >> >> as > >> >> input-type-specific logic like yours, you can use state and timers in > >> >> ParDo > >> >> to control when to output. You lose any potential optimizations of > >> >> Combine > >> >> based on associativity/commutativity and assume the burden of making > >> >> sure > >> >> your output is sensible, but dropping to low-level stateful > computation > >> >> may > >> >> be your best bet. > >> >> > >> >> Kenn > >> >> > >> >> On Tue, Jan 9, 2018 at 11:59 AM, Robert Bradshaw < > [email protected]> > >> >> wrote: > >> >>> > >> >>> We've tossed around the idea of "metadata-driven" triggers which > would > >> >>> essentially let you provide a mapping element -> metadata and a > >> >>> monotonic CombineFn metadata* -> bool that would allow for this (the > >> >>> AfterCount being a special case of this, with the mapping fn being _ > >> >>> -> 1, and the CombineFn being sum(...) >= N, for size one would > >> >>> provide a (perhaps approximate) sizing mapping fn). > >> >>> > >> >>> Note, however, that there's no guarantee that the trigger fire as > soon > >> >>> as possible; due to runtime characteristics a significant amount of > >> >>> data may be buffered (or come in at once) before the trigger is > >> >>> queried. One possibility would be to follow your triggering with a > >> >>> DoFn that breaks up large value streams into multiple manageable > sized > >> >>> ones as needed. > >> >>> > >> >>> On Tue, Jan 9, 2018 at 11:43 AM, Carlos Alonso < > [email protected]> > >> >>> wrote: > >> >>> > Hi everyone!! > >> >>> > > >> >>> > I was wondering if there is an option to trigger window panes > based > >> >>> > on > >> >>> > the > >> >>> > size of the pane itself (rather than the number of elements). > >> >>> > > >> >>> > To provide a little bit more of context we're backing up a PubSub > >> >>> > topic > >> >>> > into > >> >>> > GCS with the "special" feature that, depending on the "type" of > the > >> >>> > message, > >> >>> > the GCS destination is one or another. > >> >>> > > >> >>> > Messages' 'shape' published there is quite random, some of them > are > >> >>> > very > >> >>> > frequent and small, some others very big but sparse... We have > >> >>> > around > >> >>> > 150 > >> >>> > messages per second (in total) and we're firing every 15 minutes > and > >> >>> > experiencing OOM errors, we've considered firing based on the > number > >> >>> > of > >> >>> > items as well, but given the randomness of the input, I don't > think > >> >>> > it > >> >>> > will > >> >>> > be a final solution either. > >> >>> > > >> >>> > Having a trigger based on size would be great, another option > would > >> >>> > be > >> >>> > to > >> >>> > have a dynamic shards number for the PTransform that actually > writes > >> >>> > the > >> >>> > files. > >> >>> > > >> >>> > What is your recommendation for this use case? > >> >>> > > >> >>> > Thanks!! > >> >> > >> >> > >> > >
