Thanks Robert and Luke

This approach seems good to me. I am trying that , i have to include a
GroupBy to make Iterable<Rows> available to do ParDo function to do same.
Now GroupBy is a bottleneck, its working for last 2 hours and proceed only
40 GB data (still waiting for rest of 100's of GB of data).

Currently I used GroupByKey.Create()

What's recommended way to use what key to make it execute faster like same
key for all rows, vs different key for each row vs same row for a group of
keys.

Thanks
Aniruddh

On Thu, Apr 23, 2020 at 12:47 PM Luke Cwik <lc...@google.com> wrote:

> As Robert suggested, what prevents you from doing:
> ReadFromBQ -> ParDo(BatchInMemory) -> DLP
> where BatchInMemory stores elements in the @ProcessElement method in an in
> memory list and produce output every time the list is large enough with a
> final output in the @FinishBundle method?
>
> On Thu, Apr 23, 2020 at 9:42 AM Aniruddh Sharma <asharma...@gmail.com>
> wrote:
>
>> Hi Luke
>>
>> Sorry forgot to mention the functions. Dataflow adds following function
>> and ["PartitionKeys", new GroupByKeyAndSortValuesOnly] this is super slow,
>> How to choose keys to make it faster ?
>>
>>  .apply("ReifyWindows", ParDo.of(new ReifyWindowedValueFn<>()))
>>           .setCoder(
>>               KvCoder.of(
>>                   keyCoder,
>>                   KvCoder.of(InstantCoder.of(),
>> WindowedValue.getFullCoder(kvCoder, windowCoder))))
>>
>>           // Group by key and sort by timestamp, dropping windows as they
>> are reified
>>           .apply("PartitionKeys", new GroupByKeyAndSortValuesOnly<>())
>>
>>           // The GBKO sets the windowing strategy to the global default
>>           .setWindowingStrategyInternal(inputWindowingStrategy);
>>
>> THanks
>> ANiruddh
>>
>> On 2020/04/23 16:35:58, Aniruddh Sharma <asharma...@gmail.com> wrote:
>> > Thanks Luke for your response.
>> >
>> > My use case is following.
>> > a) I read data from BQ (TableRow)
>> > b) Convert it into (Table.Row) for DLP calls.
>> > c) have to batch Table.Row collection up to a max size of 512 KB (i.e
>> fit may rows from BQ into a single DLP table) and call DLP.
>> >
>> > Functionally, I don't have a need of key and window. As I just want to
>> fit rows in DLP table up to a max size.
>> >
>> > In batch mode, when I call StateFulAPI,
>> > it adds a "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly"
>> step and this step is super slow. Like it is running on 50 node cluster for
>> 800 GB data for last 10 hours.
>> >
>> > This step is not added when I call Dataflow in streaming mode. But I
>> can't call it in Streaming mode for other reasons.
>> >
>> > So I am trying to understand following
>> > a) Either I give a hint somehow to Dataflow runner not to add this step
>> "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly"  at all, then I
>> don't have any issues.
>> > b) if it adds this step, then how should I choose my ARTIFICIALLY
>> created keys that step can execute as fast as possible. It does a SORT by
>> on timestamps on records. As I don't have any functional key requirement,
>> shall I choose same keys for all rows vs randomkey for some rows vs random
>> key for each row; what timestamps shall I add same for all rows ? to make
>> this function work faster.
>> >
>> > Thanks
>> > Aniruddh
>> >
>> > On 2020/04/23 16:15:44, Luke Cwik <lc...@google.com> wrote:
>> > > Stateful & timely operations are always per key and window which is
>> the
>> > > GbkBeforeStatefulParDo is being added. Do you not need your stateful &
>> > > timely operation to be done per key and window, if so can you explain
>> > > further?
>> > >
>> > > On Thu, Apr 23, 2020 at 6:29 AM Aniruddh Sharma <asharma...@gmail.com
>> >
>> > > wrote:
>> > >
>> > > > Hi Kenn
>> > > >
>> > > > Thanks for your guidance, I understand that batch mode waits for
>> previous
>> > > > stage. But the real issue in this particular case is not only this.
>> > > >
>> > > > Dataflow runner adds a step automatically
>> > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which not only
>> waits
>> > > > for previous stage but it waits for a very very very long time. Is
>> there a
>> > > > way to give hint to Dataflow runner not to add this step, as in my
>> case I
>> > > > functionally do not require this step.
>> > > >
>> > > > Thanks for your suggestion, will create another thread to
>> understand BQ
>> > > > options
>> > > >
>> > > > Thanks
>> > > > Aniruddh
>> > > >
>> > > > On 2020/04/23 03:51:31, Kenneth Knowles <k...@apache.org> wrote:
>> > > > > The definition of batch mode for Dataflow is this: completely
>> compute the
>> > > > > result of one stage of computation before starting the next
>> stage. There
>> > > > is
>> > > > > no way around this. It does not have to do with using state and
>> timers.
>> > > > >
>> > > > > If you are working with state & timers & triggers, and you are
>> hoping for
>> > > > > output before the pipeline is completely terminated, then you
>> most likely
>> > > > > want streaming mode. Perhaps it is best to investigate the BQ read
>> > > > > performance issue.
>> > > > >
>> > > > > Kenn
>> > > > >
>> > > > > On Wed, Apr 22, 2020 at 4:04 PM Aniruddh Sharma <
>> asharma...@gmail.com>
>> > > > > wrote:
>> > > > >
>> > > > > > Hi
>> > > > > >
>> > > > > > I am reading a bounded collection from BQ.
>> > > > > >
>> > > > > > I have to use a Stateful & Timely operation.
>> > > > > >
>> > > > > > a) I am invoking job in batch mode. Dataflow runner adds a step
>> > > > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which has
>> > > > partitionBy.
>> > > > > > This partitionBy waits for all the data to come and becomes a
>> > > > bottleneck.
>> > > > > > when I read about its documentation it seems its objective it
>> to be
>> > > > added
>> > > > > > when there are no windows.
>> > > > > >
>> > > > > > I tried added windows and triggering them before stateful step,
>> but
>> > > > > > everything comes to this partitionBy step and waits till all
>> data is
>> > > > here.
>> > > > > >
>> > > > > > Is there a way to write code in some way (like window etc) or
>> give
>> > > > > > Dataflow a hint not to add this step in.
>> > > > > >
>> > > > > > b) I dont want to call this job in streaming mode, When I call
>> in
>> > > > > > streaming mode, this Dataflow runner does not add this step,
>> but in
>> > > > > > Streaming BQ read becomes a bottleneck.
>> > > > > >
>> > > > > > So either I have to solve how I read BQ faster if I call job in
>> > > > Streaming
>> > > > > > mode or How I bypass this partitionBy from
>> > > > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" if I
>> invoke job in
>> > > > > > batch mode ?
>> > > > > >
>> > > > > > Thanks
>> > > > > > Aniruddh
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to