Thanks Robert and Luke This approach seems good to me. I am trying that , i have to include a GroupBy to make Iterable<Rows> available to do ParDo function to do same. Now GroupBy is a bottleneck, its working for last 2 hours and proceed only 40 GB data (still waiting for rest of 100's of GB of data).
Currently I used GroupByKey.Create() What's recommended way to use what key to make it execute faster like same key for all rows, vs different key for each row vs same row for a group of keys. Thanks Aniruddh On Thu, Apr 23, 2020 at 12:47 PM Luke Cwik <lc...@google.com> wrote: > As Robert suggested, what prevents you from doing: > ReadFromBQ -> ParDo(BatchInMemory) -> DLP > where BatchInMemory stores elements in the @ProcessElement method in an in > memory list and produce output every time the list is large enough with a > final output in the @FinishBundle method? > > On Thu, Apr 23, 2020 at 9:42 AM Aniruddh Sharma <asharma...@gmail.com> > wrote: > >> Hi Luke >> >> Sorry forgot to mention the functions. Dataflow adds following function >> and ["PartitionKeys", new GroupByKeyAndSortValuesOnly] this is super slow, >> How to choose keys to make it faster ? >> >> .apply("ReifyWindows", ParDo.of(new ReifyWindowedValueFn<>())) >> .setCoder( >> KvCoder.of( >> keyCoder, >> KvCoder.of(InstantCoder.of(), >> WindowedValue.getFullCoder(kvCoder, windowCoder)))) >> >> // Group by key and sort by timestamp, dropping windows as they >> are reified >> .apply("PartitionKeys", new GroupByKeyAndSortValuesOnly<>()) >> >> // The GBKO sets the windowing strategy to the global default >> .setWindowingStrategyInternal(inputWindowingStrategy); >> >> THanks >> ANiruddh >> >> On 2020/04/23 16:35:58, Aniruddh Sharma <asharma...@gmail.com> wrote: >> > Thanks Luke for your response. >> > >> > My use case is following. >> > a) I read data from BQ (TableRow) >> > b) Convert it into (Table.Row) for DLP calls. >> > c) have to batch Table.Row collection up to a max size of 512 KB (i.e >> fit may rows from BQ into a single DLP table) and call DLP. >> > >> > Functionally, I don't have a need of key and window. As I just want to >> fit rows in DLP table up to a max size. >> > >> > In batch mode, when I call StateFulAPI, >> > it adds a "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly" >> step and this step is super slow. Like it is running on 50 node cluster for >> 800 GB data for last 10 hours. >> > >> > This step is not added when I call Dataflow in streaming mode. But I >> can't call it in Streaming mode for other reasons. >> > >> > So I am trying to understand following >> > a) Either I give a hint somehow to Dataflow runner not to add this step >> "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly" at all, then I >> don't have any issues. >> > b) if it adds this step, then how should I choose my ARTIFICIALLY >> created keys that step can execute as fast as possible. It does a SORT by >> on timestamps on records. As I don't have any functional key requirement, >> shall I choose same keys for all rows vs randomkey for some rows vs random >> key for each row; what timestamps shall I add same for all rows ? to make >> this function work faster. >> > >> > Thanks >> > Aniruddh >> > >> > On 2020/04/23 16:15:44, Luke Cwik <lc...@google.com> wrote: >> > > Stateful & timely operations are always per key and window which is >> the >> > > GbkBeforeStatefulParDo is being added. Do you not need your stateful & >> > > timely operation to be done per key and window, if so can you explain >> > > further? >> > > >> > > On Thu, Apr 23, 2020 at 6:29 AM Aniruddh Sharma <asharma...@gmail.com >> > >> > > wrote: >> > > >> > > > Hi Kenn >> > > > >> > > > Thanks for your guidance, I understand that batch mode waits for >> previous >> > > > stage. But the real issue in this particular case is not only this. >> > > > >> > > > Dataflow runner adds a step automatically >> > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which not only >> waits >> > > > for previous stage but it waits for a very very very long time. Is >> there a >> > > > way to give hint to Dataflow runner not to add this step, as in my >> case I >> > > > functionally do not require this step. >> > > > >> > > > Thanks for your suggestion, will create another thread to >> understand BQ >> > > > options >> > > > >> > > > Thanks >> > > > Aniruddh >> > > > >> > > > On 2020/04/23 03:51:31, Kenneth Knowles <k...@apache.org> wrote: >> > > > > The definition of batch mode for Dataflow is this: completely >> compute the >> > > > > result of one stage of computation before starting the next >> stage. There >> > > > is >> > > > > no way around this. It does not have to do with using state and >> timers. >> > > > > >> > > > > If you are working with state & timers & triggers, and you are >> hoping for >> > > > > output before the pipeline is completely terminated, then you >> most likely >> > > > > want streaming mode. Perhaps it is best to investigate the BQ read >> > > > > performance issue. >> > > > > >> > > > > Kenn >> > > > > >> > > > > On Wed, Apr 22, 2020 at 4:04 PM Aniruddh Sharma < >> asharma...@gmail.com> >> > > > > wrote: >> > > > > >> > > > > > Hi >> > > > > > >> > > > > > I am reading a bounded collection from BQ. >> > > > > > >> > > > > > I have to use a Stateful & Timely operation. >> > > > > > >> > > > > > a) I am invoking job in batch mode. Dataflow runner adds a step >> > > > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which has >> > > > partitionBy. >> > > > > > This partitionBy waits for all the data to come and becomes a >> > > > bottleneck. >> > > > > > when I read about its documentation it seems its objective it >> to be >> > > > added >> > > > > > when there are no windows. >> > > > > > >> > > > > > I tried added windows and triggering them before stateful step, >> but >> > > > > > everything comes to this partitionBy step and waits till all >> data is >> > > > here. >> > > > > > >> > > > > > Is there a way to write code in some way (like window etc) or >> give >> > > > > > Dataflow a hint not to add this step in. >> > > > > > >> > > > > > b) I dont want to call this job in streaming mode, When I call >> in >> > > > > > streaming mode, this Dataflow runner does not add this step, >> but in >> > > > > > Streaming BQ read becomes a bottleneck. >> > > > > > >> > > > > > So either I have to solve how I read BQ faster if I call job in >> > > > Streaming >> > > > > > mode or How I bypass this partitionBy from >> > > > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" if I >> invoke job in >> > > > > > batch mode ? >> > > > > > >> > > > > > Thanks >> > > > > > Aniruddh >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >