If you're using State solely to batch elements together, I would recommend avoiding state altogether. You can instead have a DoFn that holds a List<Element> as a member variable, add to it, and possibly emit the list at the threshold, in your ProcessElement method, and also emit the batch in FinishBundle.
On Thu, Apr 23, 2020 at 9:36 AM Aniruddh Sharma <asharma...@gmail.com> wrote: > Thanks Luke for your response. > > My use case is following. > a) I read data from BQ (TableRow) > b) Convert it into (Table.Row) for DLP calls. > c) have to batch Table.Row collection up to a max size of 512 KB (i.e fit > may rows from BQ into a single DLP table) and call DLP. > > Functionally, I don't have a need of key and window. As I just want to fit > rows in DLP table up to a max size. > > In batch mode, when I call StateFulAPI, > it adds a "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly" step > and this step is super slow. Like it is running on 50 node cluster for 800 > GB data for last 10 hours. > > This step is not added when I call Dataflow in streaming mode. But I can't > call it in Streaming mode for other reasons. > > So I am trying to understand following > a) Either I give a hint somehow to Dataflow runner not to add this step > "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly" at all, then I > don't have any issues. > b) if it adds this step, then how should I choose my ARTIFICIALLY created > keys that step can execute as fast as possible. It does a SORT by on > timestamps on records. As I don't have any functional key requirement, > shall I choose same keys for all rows vs randomkey for some rows vs random > key for each row; what timestamps shall I add same for all rows ? to make > this function work faster. > > Thanks > Aniruddh > > On 2020/04/23 16:15:44, Luke Cwik <lc...@google.com> wrote: > > Stateful & timely operations are always per key and window which is the > > GbkBeforeStatefulParDo is being added. Do you not need your stateful & > > timely operation to be done per key and window, if so can you explain > > further? > > > > On Thu, Apr 23, 2020 at 6:29 AM Aniruddh Sharma <asharma...@gmail.com> > > wrote: > > > > > Hi Kenn > > > > > > Thanks for your guidance, I understand that batch mode waits for > previous > > > stage. But the real issue in this particular case is not only this. > > > > > > Dataflow runner adds a step automatically > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which not only > waits > > > for previous stage but it waits for a very very very long time. Is > there a > > > way to give hint to Dataflow runner not to add this step, as in my > case I > > > functionally do not require this step. > > > > > > Thanks for your suggestion, will create another thread to understand BQ > > > options > > > > > > Thanks > > > Aniruddh > > > > > > On 2020/04/23 03:51:31, Kenneth Knowles <k...@apache.org> wrote: > > > > The definition of batch mode for Dataflow is this: completely > compute the > > > > result of one stage of computation before starting the next stage. > There > > > is > > > > no way around this. It does not have to do with using state and > timers. > > > > > > > > If you are working with state & timers & triggers, and you are > hoping for > > > > output before the pipeline is completely terminated, then you most > likely > > > > want streaming mode. Perhaps it is best to investigate the BQ read > > > > performance issue. > > > > > > > > Kenn > > > > > > > > On Wed, Apr 22, 2020 at 4:04 PM Aniruddh Sharma < > asharma...@gmail.com> > > > > wrote: > > > > > > > > > Hi > > > > > > > > > > I am reading a bounded collection from BQ. > > > > > > > > > > I have to use a Stateful & Timely operation. > > > > > > > > > > a) I am invoking job in batch mode. Dataflow runner adds a step > > > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which has > > > partitionBy. > > > > > This partitionBy waits for all the data to come and becomes a > > > bottleneck. > > > > > when I read about its documentation it seems its objective it to be > > > added > > > > > when there are no windows. > > > > > > > > > > I tried added windows and triggering them before stateful step, but > > > > > everything comes to this partitionBy step and waits till all data > is > > > here. > > > > > > > > > > Is there a way to write code in some way (like window etc) or give > > > > > Dataflow a hint not to add this step in. > > > > > > > > > > b) I dont want to call this job in streaming mode, When I call in > > > > > streaming mode, this Dataflow runner does not add this step, but in > > > > > Streaming BQ read becomes a bottleneck. > > > > > > > > > > So either I have to solve how I read BQ faster if I call job in > > > Streaming > > > > > mode or How I bypass this partitionBy from > > > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" if I invoke > job in > > > > > batch mode ? > > > > > > > > > > Thanks > > > > > Aniruddh > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >