Hi Luke

Sorry forgot to mention the functions. Dataflow adds following function and 
["PartitionKeys", new GroupByKeyAndSortValuesOnly] this is super slow, How to 
choose keys to make it faster ?

 .apply("ReifyWindows", ParDo.of(new ReifyWindowedValueFn<>()))
          .setCoder(
              KvCoder.of(
                  keyCoder,
                  KvCoder.of(InstantCoder.of(), 
WindowedValue.getFullCoder(kvCoder, windowCoder))))

          // Group by key and sort by timestamp, dropping windows as they are 
reified
          .apply("PartitionKeys", new GroupByKeyAndSortValuesOnly<>())

          // The GBKO sets the windowing strategy to the global default
          .setWindowingStrategyInternal(inputWindowingStrategy);

THanks
ANiruddh

On 2020/04/23 16:35:58, Aniruddh Sharma <asharma...@gmail.com> wrote: 
> Thanks Luke for your response.
> 
> My use case is following.
> a) I read data from BQ (TableRow)
> b) Convert it into (Table.Row) for DLP calls.
> c) have to batch Table.Row collection up to a max size of 512 KB (i.e fit may 
> rows from BQ into a single DLP table) and call DLP.
> 
> Functionally, I don't have a need of key and window. As I just want to fit 
> rows in DLP table up to a max size.
> 
> In batch mode, when I call StateFulAPI, 
> it adds a "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly" step and 
> this step is super slow. Like it is running on 50 node cluster for 800 GB 
> data for last 10 hours.
> 
> This step is not added when I call Dataflow in streaming mode. But I can't 
> call it in Streaming mode for other reasons.
> 
> So I am trying to understand following
> a) Either I give a hint somehow to Dataflow runner not to add this step 
> "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly"  at all, then I 
> don't have any issues.
> b) if it adds this step, then how should I choose my ARTIFICIALLY created 
> keys that step can execute as fast as possible. It does a SORT by on 
> timestamps on records. As I don't have any functional key requirement, shall 
> I choose same keys for all rows vs randomkey for some rows vs random key for 
> each row; what timestamps shall I add same for all rows ? to make this 
> function work faster.
> 
> Thanks
> Aniruddh
> 
> On 2020/04/23 16:15:44, Luke Cwik <lc...@google.com> wrote: 
> > Stateful & timely operations are always per key and window which is the
> > GbkBeforeStatefulParDo is being added. Do you not need your stateful &
> > timely operation to be done per key and window, if so can you explain
> > further?
> > 
> > On Thu, Apr 23, 2020 at 6:29 AM Aniruddh Sharma <asharma...@gmail.com>
> > wrote:
> > 
> > > Hi Kenn
> > >
> > > Thanks for your guidance, I understand that batch mode waits for previous
> > > stage. But the real issue in this particular case is not only this.
> > >
> > > Dataflow runner adds a step automatically
> > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which not only waits
> > > for previous stage but it waits for a very very very long time. Is there a
> > > way to give hint to Dataflow runner not to add this step, as in my case I
> > > functionally do not require this step.
> > >
> > > Thanks for your suggestion, will create another thread to understand BQ
> > > options
> > >
> > > Thanks
> > > Aniruddh
> > >
> > > On 2020/04/23 03:51:31, Kenneth Knowles <k...@apache.org> wrote:
> > > > The definition of batch mode for Dataflow is this: completely compute 
> > > > the
> > > > result of one stage of computation before starting the next stage. There
> > > is
> > > > no way around this. It does not have to do with using state and timers.
> > > >
> > > > If you are working with state & timers & triggers, and you are hoping 
> > > > for
> > > > output before the pipeline is completely terminated, then you most 
> > > > likely
> > > > want streaming mode. Perhaps it is best to investigate the BQ read
> > > > performance issue.
> > > >
> > > > Kenn
> > > >
> > > > On Wed, Apr 22, 2020 at 4:04 PM Aniruddh Sharma <asharma...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi
> > > > >
> > > > > I am reading a bounded collection from BQ.
> > > > >
> > > > > I have to use a Stateful & Timely operation.
> > > > >
> > > > > a) I am invoking job in batch mode. Dataflow runner adds a step
> > > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which has
> > > partitionBy.
> > > > > This partitionBy waits for all the data to come and becomes a
> > > bottleneck.
> > > > > when I read about its documentation it seems its objective it to be
> > > added
> > > > > when there are no windows.
> > > > >
> > > > > I tried added windows and triggering them before stateful step, but
> > > > > everything comes to this partitionBy step and waits till all data is
> > > here.
> > > > >
> > > > > Is there a way to write code in some way (like window etc) or give
> > > > > Dataflow a hint not to add this step in.
> > > > >
> > > > > b) I dont want to call this job in streaming mode, When I call in
> > > > > streaming mode, this Dataflow runner does not add this step, but in
> > > > > Streaming BQ read becomes a bottleneck.
> > > > >
> > > > > So either I have to solve how I read BQ faster if I call job in
> > > Streaming
> > > > > mode or How I bypass this partitionBy from
> > > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" if I invoke job 
> > > > > in
> > > > > batch mode ?
> > > > >
> > > > > Thanks
> > > > > Aniruddh
> > > > >
> > > > >
> > > > >
> > > > >
> > > >
> > >
> > 
> 

Reply via email to