Le mardi 17 juillet 2018 à 09:48 -0700, Eugene Kirpichov a écrit :
> On Tue, Jul 17, 2018 at 2:49 AM Etienne Chauchot <echauc...@apache.org> wrote:
> > Hi Eugene
> > Le lundi 16 juillet 2018 à 07:52 -0700, Eugene Kirpichov a écrit :
> > > Hi Etienne - thanks for catching this; indeed, I somehow missed that 
> > > actually several runners do this same thing -
> > > it seemed to me as something that can be done in user code (because it 
> > > involves combining estimated size + split
> > > in pretty much the same way), 
> > 
> > When you say "user code", you mean IO writter code by opposition to runner 
> > code right ?
> Correct: "user code" is what happens in the SDK or the user pipeline.
>  
> >  
> > 
> > 
> > > but I'm not so sure: even though many runners have a "desired 
> > > parallelism" option or alike, it's not all of them,
> > > so we can't use such an option universally.
> > 
> > Agree, cannot be universal
> > > Maybe then the right thing to do is to:
> > > - Use bounded SDFs for these
> > > - Change SDF @SplitRestriction API to take a desired number of splits as 
> > > a parameter, and introduce an API
> > > @EstimateOutputSizeBytes(element) valid only on bounded SDFs
> > Agree with the idea but EstimateOutpuSize must return the size of the 
> > dataset not of an element.
> Please recall that the element here is e.g. a filename, or name of a BigTable 
> table, or something like that - i.e. the
> element describes the dataset, and the restriction describes what part of the 
> dataset
> 
> If e.g. we have a PCollection<String> of filenames and apply a ReadTextFn SDF 
> to it, and want the runner to know the
> total size of all files - the runner could insert some transforms to apply 
> EstimateOutputSize to each element and
> Sum.globally() them.

You're right, I missunderstood what you meant by element. The important is that 
the runner could at some point before
calling @SplitRestriction know the size of the dataset, potentially with the 
Sum you mentioned.

>  
> >  On some runners, each worker is set to a given amount of heap.  Thus, it 
> > is important that a runner could evaluate
> > the size of the whole dataset to determine the size of each split (to fit 
> > in memory of the workers) and thus tell
> > the bounded SDF the number of desired splits.
> > > - Add some plumbing to the standard bounded SDF expansion so that 
> > > different runners can compute that parameter
> > > differently, the two standard ways being "split into given number of 
> > > splits" or "split based on the sub-linear
> > > formula of estimated size".
> > > 
> > > I think this would work, though this is somewhat more work than I 
> > > anticipated. Any alternative ideas?
> > +1 It will be very similar for an IO developer (@EstimateOutputSizeBytes 
> > will be similar to
> > source.getEstimatedSizeBytes(), and @SplitRestriction(desiredSplits) 
> > similar to source.split(desiredBundleSize))
> Yeah I'm not sure this is actually a good thing that these APIs end up so 
> similar to the old ones - I was hoping we
> could come up with something better - but seems like there's no viable 
> alternative at this point :) 
> > Etienne
> > > On Mon, Jul 16, 2018 at 3:07 AM Etienne Chauchot <echauc...@apache.org> 
> > > wrote:
> > > > Hi, 
> > > > thanks Eugene for analyzing and sharing that.
> > > > I have one comment inline
> > > > 
> > > > Etienne
> > > > 
> > > > Le dimanche 15 juillet 2018 à 14:20 -0700, Eugene Kirpichov a écrit :
> > > > > Hey beamers,
> > > > > I've always wondered whether the BoundedSource implementations in the 
> > > > > Beam SDK are worth their complexity, or
> > > > > whether they rather could be converted to the much easier to code 
> > > > > ParDo style, which is also more modular and
> > > > > allows you to very easily implement readAll().
> > > > > 
> > > > > There's a handful: file-based sources, BigQuery, Bigtable, HBase, 
> > > > > Elasticsearch, MongoDB, Solr and a couple
> > > > > more.
> > > > > 
> > > > > Curiously enough, BoundedSource vs. ParDo matters *only* on Dataflow, 
> > > > > because AFAICT Dataflow is the only
> > > > > runner that cares about the things that BoundedSource can do and 
> > > > > ParDo can't:
> > > > > - size estimation (used to choose an initial number of workers) [ok, 
> > > > > Flink calls the function to return
> > > > > statistics, but doesn't seem to do anything else with it]
> > > > 
> > > > => Spark uses size estimation to set desired bundle size with something 
> > > > like desiredBundleSize = estimatedSize /
> > > > nbOfWorkersConfigured (partitions)
> > > > See 
> > > > https://github.com/apache/beam/blob/a5634128d194161aebc8d03229fdaa1066cf7739/runners/spark/src/main/java/org
> > > > /apache/beam/runners/spark/io/SourceRDD.java#L101
> > > > 
> > > > 
> > > > > - splitting into bundles of given size (Dataflow chooses the number 
> > > > > of bundles to create based on a simple
> > > > > formula that's not entirely unlike K*sqrt(size))
> > > > > - liquid sharding (splitAtFraction())
> > > > > 
> > > > > If Dataflow didn't exist, there'd be no reason at all to use 
> > > > > BoundedSource. So the question "which ones can be
> > > > > converted to ParDo" is really "which ones are used on Dataflow in 
> > > > > ways that make these functions matter".
> > > > > Previously, my conservative assumption was that the answer is "all of 
> > > > > them", but turns out this is not so.
> > > > > 
> > > > > Liquid sharding always matters; if the source is liquid-shardable, 
> > > > > for now we have to keep it a source (until
> > > > > SDF gains liquid sharding - which should happen in a quarter or two I 
> > > > > think).
> > > > > 
> > > > > Choosing number of bundles to split into is easily done in SDK code, 
> > > > > see https://github.com/apache/beam/pull/5
> > > > > 886 for example; DatastoreIO does something similar.
> > > > > 
> > > > > The remaining thing to analyze is, when does initial scaling matter. 
> > > > > So as a member of the Dataflow team, I
> > > > > analyzed statistics of production Dataflow jobs in the past month. I 
> > > > > can not share my queries nor the data,
> > > > > because they are proprietary to Google - so I am sharing just the 
> > > > > general methodology and conclusions, because
> > > > > they matter to the Beam community. I looked at a few criteria, such 
> > > > > as:
> > > > > - The job should be not too short and not too long: if it's too short 
> > > > > then scaling couldn't have kicked in
> > > > > much at all; if it's too long then dynamic autoscaling would have 
> > > > > been sufficient.
> > > > > - The job should use, at peak, at least a handful of workers 
> > > > > (otherwise means it wasn't used in settings where
> > > > > much scaling happened)
> > > > > After a couple more rounds of narrowing-down, with some hand-checking 
> > > > > that the results and criteria so far
> > > > > make sense, I ended up with nothing - no jobs that would have 
> > > > > suffered a serious performance regression if
> > > > > their BoundedSource had not supported initial size estimation [of 
> > > > > course, except for the liquid-shardable
> > > > > ones].
> > > > > 
> > > > > Based on this, I would like to propose to convert the following 
> > > > > BoundedSource-based IOs to ParDo-based, and
> > > > > while we're at it, probably also add readAll() versions (not 
> > > > > necessarily in exactly the same PR):
> > > > > - ElasticsearchIO
> > > > > - SolrIO
> > > > > - MongoDbIO
> > > > > - MongoDbGridFSIO
> > > > > - CassandraIO
> > > > > - HCatalogIO
> > > > > - HadoopInputFormatIO
> > > > > - UnboundedToBoundedSourceAdapter (already have a PR in progress for 
> > > > > this one)
> > > > > These would not translate to a single ParDo - rather, they'd 
> > > > > translate to ParDo(estimate size and split
> > > > > according to the formula), Reshuffle, ParDo(read data) - or possibly 
> > > > > to a bounded SDF doing roughly the same
> > > > > (luckily after https://github.com/apache/beam/pull/5940 all runners 
> > > > > at master will support bounded SDF so this
> > > > > is safe compatibility-wise). Pretty much like DatastoreIO does.
> > > > > 
> > > > > I would like to also propose to change the IO authoring guide 
> > > > > https://beam.apache.org/documentation/io/authori
> > > > > ng-overview/#when-to-implement-using-the-source-api to basically say 
> > > > > "Never implement a new BoundedSource
> > > > > unless you can support liquid sharding". And add a utility for 
> > > > > computing a desired number of splits.
> > > > > 
> > > > > There might be some more details here to iron out, but I wanted to 
> > > > > check with the community that this overall
> > > > > makes sense.
> > > > > 
> > > > > Thanks.

Reply via email to