I may have misinterpreted your email, I thought you didn't have a need for
keys at all. If this is actually the case, you don't need a GroupByKey,
just have your DoFn take Rows as input, and emit List<Row> as output. That
is, it's a DoFn<Row, List<Row>>.

You can buffer multiple Rows in an instance variable between process
element calls. For example,

class MyBufferingDoFn<T, List<T>> {
  List<T> buffer = new ArrayList<>();
  @ProcessElement public void processElement(T elt, OutputReceiver<List<T>>
out) {
    buffer.append(out);
    if (buffer.size() > 100) {
      out.output(buffer);
      buffer = new ArrayList<>();
    }
  }
  @FinishBundle public void finishBundle(OutputReceiver<List<T>> out) {
    out.output(buffer);
    buffer = new ArrayList<>();
  }
}

See
https://beam.apache.org/releases/javadoc/2.20.0/org/apache/beam/sdk/transforms/ParDo.html
for
more information on the lifetime of DoFns.

As for why your GBK is taking so long, yes, this can be a bottleneck.
However, it should be noted that Dataflow (like most other runners)
executes this step in conjunction with other steps as part of a "fused
stage." So if your pipeline looks like

    Read -> DoFnA -> GBK -> DoFnB -> Write

then Read, DoFnA, and GBK[part1] will execute concurrently (all starting up
almost immediately), one element at at time, and when that's finished,
GBK[part2, DoFnB, Write will execute concurrently, one element at a time,
so you can't just look at the last unfinished stage to determine where the
bottleneck is. (One helpful tool, however, is looking at the amount of time
spent on each step in the UI.)

Hopefully that helps.

- Robert


On Thu, Apr 23, 2020 at 12:43 PM Aniruddh Sharma <asharma...@gmail.com>
wrote:

> Thanks Robert and Luke
>
> This approach seems good to me. I am trying that , i have to include a
> GroupBy to make Iterable<Rows> available to do ParDo function to do same.
> Now GroupBy is a bottleneck, its working for last 2 hours and proceed only
> 40 GB data (still waiting for rest of 100's of GB of data).
>
> Currently I used GroupByKey.Create()
>
> What's recommended way to use what key to make it execute faster like same
> key for all rows, vs different key for each row vs same row for a group of
> keys.
>
> Thanks
> Aniruddh
>
> On Thu, Apr 23, 2020 at 12:47 PM Luke Cwik <lc...@google.com> wrote:
>
>> As Robert suggested, what prevents you from doing:
>> ReadFromBQ -> ParDo(BatchInMemory) -> DLP
>> where BatchInMemory stores elements in the @ProcessElement method in an
>> in memory list and produce output every time the list is large enough with
>> a final output in the @FinishBundle method?
>>
>> On Thu, Apr 23, 2020 at 9:42 AM Aniruddh Sharma <asharma...@gmail.com>
>> wrote:
>>
>>> Hi Luke
>>>
>>> Sorry forgot to mention the functions. Dataflow adds following function
>>> and ["PartitionKeys", new GroupByKeyAndSortValuesOnly] this is super slow,
>>> How to choose keys to make it faster ?
>>>
>>>  .apply("ReifyWindows", ParDo.of(new ReifyWindowedValueFn<>()))
>>>           .setCoder(
>>>               KvCoder.of(
>>>                   keyCoder,
>>>                   KvCoder.of(InstantCoder.of(),
>>> WindowedValue.getFullCoder(kvCoder, windowCoder))))
>>>
>>>           // Group by key and sort by timestamp, dropping windows as
>>> they are reified
>>>           .apply("PartitionKeys", new GroupByKeyAndSortValuesOnly<>())
>>>
>>>           // The GBKO sets the windowing strategy to the global default
>>>           .setWindowingStrategyInternal(inputWindowingStrategy);
>>>
>>> THanks
>>> ANiruddh
>>>
>>> On 2020/04/23 16:35:58, Aniruddh Sharma <asharma...@gmail.com> wrote:
>>> > Thanks Luke for your response.
>>> >
>>> > My use case is following.
>>> > a) I read data from BQ (TableRow)
>>> > b) Convert it into (Table.Row) for DLP calls.
>>> > c) have to batch Table.Row collection up to a max size of 512 KB (i.e
>>> fit may rows from BQ into a single DLP table) and call DLP.
>>> >
>>> > Functionally, I don't have a need of key and window. As I just want to
>>> fit rows in DLP table up to a max size.
>>> >
>>> > In batch mode, when I call StateFulAPI,
>>> > it adds a "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly"
>>> step and this step is super slow. Like it is running on 50 node cluster for
>>> 800 GB data for last 10 hours.
>>> >
>>> > This step is not added when I call Dataflow in streaming mode. But I
>>> can't call it in Streaming mode for other reasons.
>>> >
>>> > So I am trying to understand following
>>> > a) Either I give a hint somehow to Dataflow runner not to add this
>>> step "BatchStatefulParDoOverrides.GroupByKeyAndSortValuesOnly"  at all,
>>> then I don't have any issues.
>>> > b) if it adds this step, then how should I choose my ARTIFICIALLY
>>> created keys that step can execute as fast as possible. It does a SORT by
>>> on timestamps on records. As I don't have any functional key requirement,
>>> shall I choose same keys for all rows vs randomkey for some rows vs random
>>> key for each row; what timestamps shall I add same for all rows ? to make
>>> this function work faster.
>>> >
>>> > Thanks
>>> > Aniruddh
>>> >
>>> > On 2020/04/23 16:15:44, Luke Cwik <lc...@google.com> wrote:
>>> > > Stateful & timely operations are always per key and window which is
>>> the
>>> > > GbkBeforeStatefulParDo is being added. Do you not need your stateful
>>> &
>>> > > timely operation to be done per key and window, if so can you explain
>>> > > further?
>>> > >
>>> > > On Thu, Apr 23, 2020 at 6:29 AM Aniruddh Sharma <
>>> asharma...@gmail.com>
>>> > > wrote:
>>> > >
>>> > > > Hi Kenn
>>> > > >
>>> > > > Thanks for your guidance, I understand that batch mode waits for
>>> previous
>>> > > > stage. But the real issue in this particular case is not only this.
>>> > > >
>>> > > > Dataflow runner adds a step automatically
>>> > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which not
>>> only waits
>>> > > > for previous stage but it waits for a very very very long time. Is
>>> there a
>>> > > > way to give hint to Dataflow runner not to add this step, as in my
>>> case I
>>> > > > functionally do not require this step.
>>> > > >
>>> > > > Thanks for your suggestion, will create another thread to
>>> understand BQ
>>> > > > options
>>> > > >
>>> > > > Thanks
>>> > > > Aniruddh
>>> > > >
>>> > > > On 2020/04/23 03:51:31, Kenneth Knowles <k...@apache.org> wrote:
>>> > > > > The definition of batch mode for Dataflow is this: completely
>>> compute the
>>> > > > > result of one stage of computation before starting the next
>>> stage. There
>>> > > > is
>>> > > > > no way around this. It does not have to do with using state and
>>> timers.
>>> > > > >
>>> > > > > If you are working with state & timers & triggers, and you are
>>> hoping for
>>> > > > > output before the pipeline is completely terminated, then you
>>> most likely
>>> > > > > want streaming mode. Perhaps it is best to investigate the BQ
>>> read
>>> > > > > performance issue.
>>> > > > >
>>> > > > > Kenn
>>> > > > >
>>> > > > > On Wed, Apr 22, 2020 at 4:04 PM Aniruddh Sharma <
>>> asharma...@gmail.com>
>>> > > > > wrote:
>>> > > > >
>>> > > > > > Hi
>>> > > > > >
>>> > > > > > I am reading a bounded collection from BQ.
>>> > > > > >
>>> > > > > > I have to use a Stateful & Timely operation.
>>> > > > > >
>>> > > > > > a) I am invoking job in batch mode. Dataflow runner adds a step
>>> > > > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" which has
>>> > > > partitionBy.
>>> > > > > > This partitionBy waits for all the data to come and becomes a
>>> > > > bottleneck.
>>> > > > > > when I read about its documentation it seems its objective it
>>> to be
>>> > > > added
>>> > > > > > when there are no windows.
>>> > > > > >
>>> > > > > > I tried added windows and triggering them before stateful
>>> step, but
>>> > > > > > everything comes to this partitionBy step and waits till all
>>> data is
>>> > > > here.
>>> > > > > >
>>> > > > > > Is there a way to write code in some way (like window etc) or
>>> give
>>> > > > > > Dataflow a hint not to add this step in.
>>> > > > > >
>>> > > > > > b) I dont want to call this job in streaming mode, When I call
>>> in
>>> > > > > > streaming mode, this Dataflow runner does not add this step,
>>> but in
>>> > > > > > Streaming BQ read becomes a bottleneck.
>>> > > > > >
>>> > > > > > So either I have to solve how I read BQ faster if I call job in
>>> > > > Streaming
>>> > > > > > mode or How I bypass this partitionBy from
>>> > > > > > "BatchStatefulParDoOverrides.GbkBeforeStatefulParDo" if I
>>> invoke job in
>>> > > > > > batch mode ?
>>> > > > > >
>>> > > > > > Thanks
>>> > > > > > Aniruddh
>>> > > > > >
>>> > > > > >
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>

Reply via email to