Hi Abdul,

Going back to your use case, if the use case is to do batching of the elements 
on a unbounded source, then you can use
GroupIntoBatches transform that groups elements in batches (Iterables) of the 
size you specify.  You can then process
the batch downstream in your pipeline.
PS: to add an example to Eugene's list: I think in batch mode Spark runner 
evaluates the size of data in the
boundedSource then divides it by the number of the available workers. So it 
ends up having one bundle per worker of size
= total/number of workers.
Etienne
Le mardi 22 mai 2018 à 11:35 -0700, Eugene Kirpichov a écrit :
> Different runners decide it differently.
> E.g. for the Dataflow runner: in batch mode, bundles are usually quite large, 
> e.g. something like several-dozen-MB
> chunks of files, or pretty big key ranges of something like BigTable or 
> GroupByKey output. The bundle sizes are not
> known in advance (e.g. when the runner produces a bundle "read key range [a, 
> b)" the runner has no way of knowing how
> many keys there actually are between a and b), and they even change as the 
> job runs [1]. In streaming mode, bundles
> usually close after either a few thousand elements read from the source, or a 
> few seconds, whichever happens first -
> nothing too fancy going on.
> 
> Flink runner currently puts each element in its own bundle, but this is quite 
> inefficient and a known performance
> issue. Spark, I don't know. Direct runner I think has a mix between these 
> strategies.
> 
> Basically, if you want batching, you have to do it yourself, in a way that 
> does not violate runner bundle boundaries
> (don't batch across a FinishBundle). In practice this is trivial to implement 
> and never much of a problem.
> 
> [1] 
> https://qconlondon.com/system/files/presentation-slides/straggler-free_data_processing_in_cloud_dataflow.pdf
> On Tue, May 22, 2018 at 1:12 AM Abdul Qadeer <quadeer....@gmail.com> wrote:
> > Hi Eugene!
> > I had gone through that link before sending an email here. It does a decent 
> > job explaining when to use which method
> > and what kind of optimisations we are looking at, but didn’t really answer 
> > the question I had i.e. the controlling
> > granularity of elements of PCollection in a bundle. Kenneth made it clear 
> > that it is not in user control, but now I
> > am interested to know how does the runner decide it.
> > 
> > 
> > > On May 21, 2018, at 7:55 PM, Eugene Kirpichov <kirpic...@google.com> 
> > > wrote:
> > > 
> > > Hi Abdul,Please see 
> > > https://stackoverflow.com/questions/45985753/what-is-the-difference-between-dofn-setup-and-dof
> > > n-startbundlele - let me know if it answers your question sufficiently.
> > > On Mon, May 21, 2018 at 7:04 PM Abdul Qadeer <quadeer....@gmail.com> 
> > > wrote:
> > > > Hi!
> > > > I was trying to understand the behavior of StartBundle and FinishBundle 
> > > > w.r.t. DoFns.
> > > > I have an unbounded data source and I am trying to leverage bundling to 
> > > > achieve batching.
> > > > From the docs of ParDo:
> > > > 
> > > > "when a ParDo transform is executed, the elements of the input 
> > > > PCollection are first divided up into some number
> > > > of "bundles"
> > > > 
> > > > 
> > > > I would like to know if bundling is possible for unbounded data in the 
> > > > first place. If it is then how do I
> > > > control the bundle size i.e. number of elements of a given PCollection 
> > > > in that bundle?

Reply via email to