Hi Eugene
Le lundi 16 juillet 2018 à 07:52 -0700, Eugene Kirpichov a écrit :
> Hi Etienne - thanks for catching this; indeed, I somehow missed that actually 
> several runners do this same thing - it
> seemed to me as something that can be done in user code (because it involves 
> combining estimated size + split in
> pretty much the same way), 

When you say "user code", you mean IO writter code by opposition to runner code 
right ? 


> but I'm not so sure: even though many runners have a "desired parallelism" 
> option or alike, it's not all of them, so
> we can't use such an option universally.

Agree, cannot be universal
> Maybe then the right thing to do is to:
> - Use bounded SDFs for these
> - Change SDF @SplitRestriction API to take a desired number of splits as a 
> parameter, and introduce an API
> @EstimateOutputSizeBytes(element) valid only on bounded SDFs
Agree with the idea but EstimateOutpuSize must return the size of the dataset 
not of an element. On some runners, each
worker is set to a given amount of heap.  Thus, it is important that a runner 
could evaluate the size of the whole
dataset to determine the size of each split (to fit in memory of the workers) 
and thus tell the bounded SDF the number
of desired splits.
> - Add some plumbing to the standard bounded SDF expansion so that different 
> runners can compute that parameter
> differently, the two standard ways being "split into given number of splits" 
> or "split based on the sub-linear formula
> of estimated size".
> 
> I think this would work, though this is somewhat more work than I 
> anticipated. Any alternative ideas?
+1 It will be very similar for an IO developer (@EstimateOutputSizeBytes will 
be similar to
source.getEstimatedSizeBytes(), and @SplitRestriction(desiredSplits) similar to 
source.split(desiredBundleSize))
Etienne
> On Mon, Jul 16, 2018 at 3:07 AM Etienne Chauchot <echauc...@apache.org> wrote:
> > Hi, 
> > thanks Eugene for analyzing and sharing that.
> > I have one comment inline
> > 
> > Etienne
> > 
> > Le dimanche 15 juillet 2018 à 14:20 -0700, Eugene Kirpichov a écrit :
> > > Hey beamers,
> > > I've always wondered whether the BoundedSource implementations in the 
> > > Beam SDK are worth their complexity, or
> > > whether they rather could be converted to the much easier to code ParDo 
> > > style, which is also more modular and
> > > allows you to very easily implement readAll().
> > > 
> > > There's a handful: file-based sources, BigQuery, Bigtable, HBase, 
> > > Elasticsearch, MongoDB, Solr and a couple more.
> > > 
> > > Curiously enough, BoundedSource vs. ParDo matters *only* on Dataflow, 
> > > because AFAICT Dataflow is the only runner
> > > that cares about the things that BoundedSource can do and ParDo can't:
> > > - size estimation (used to choose an initial number of workers) [ok, 
> > > Flink calls the function to return
> > > statistics, but doesn't seem to do anything else with it]
> > 
> > => Spark uses size estimation to set desired bundle size with something 
> > like desiredBundleSize = estimatedSize /
> > nbOfWorkersConfigured (partitions)
> > See 
> > https://github.com/apache/beam/blob/a5634128d194161aebc8d03229fdaa1066cf7739/runners/spark/src/main/java/org/apa
> > che/beam/runners/spark/io/SourceRDD.java#L101
> > 
> > 
> > > - splitting into bundles of given size (Dataflow chooses the number of 
> > > bundles to create based on a simple formula
> > > that's not entirely unlike K*sqrt(size))
> > > - liquid sharding (splitAtFraction())
> > > 
> > > If Dataflow didn't exist, there'd be no reason at all to use 
> > > BoundedSource. So the question "which ones can be
> > > converted to ParDo" is really "which ones are used on Dataflow in ways 
> > > that make these functions matter".
> > > Previously, my conservative assumption was that the answer is "all of 
> > > them", but turns out this is not so.
> > > 
> > > Liquid sharding always matters; if the source is liquid-shardable, for 
> > > now we have to keep it a source (until SDF
> > > gains liquid sharding - which should happen in a quarter or two I think).
> > > 
> > > Choosing number of bundles to split into is easily done in SDK code, see 
> > > https://github.com/apache/beam/pull/5886 
> > > for example; DatastoreIO does something similar.
> > > 
> > > The remaining thing to analyze is, when does initial scaling matter. So 
> > > as a member of the Dataflow team, I
> > > analyzed statistics of production Dataflow jobs in the past month. I can 
> > > not share my queries nor the data,
> > > because they are proprietary to Google - so I am sharing just the general 
> > > methodology and conclusions, because
> > > they matter to the Beam community. I looked at a few criteria, such as:
> > > - The job should be not too short and not too long: if it's too short 
> > > then scaling couldn't have kicked in much at
> > > all; if it's too long then dynamic autoscaling would have been sufficient.
> > > - The job should use, at peak, at least a handful of workers (otherwise 
> > > means it wasn't used in settings where
> > > much scaling happened)
> > > After a couple more rounds of narrowing-down, with some hand-checking 
> > > that the results and criteria so far make
> > > sense, I ended up with nothing - no jobs that would have suffered a 
> > > serious performance regression if their
> > > BoundedSource had not supported initial size estimation [of course, 
> > > except for the liquid-shardable ones].
> > > 
> > > Based on this, I would like to propose to convert the following 
> > > BoundedSource-based IOs to ParDo-based, and while
> > > we're at it, probably also add readAll() versions (not necessarily in 
> > > exactly the same PR):
> > > - ElasticsearchIO
> > > - SolrIO
> > > - MongoDbIO
> > > - MongoDbGridFSIO
> > > - CassandraIO
> > > - HCatalogIO
> > > - HadoopInputFormatIO
> > > - UnboundedToBoundedSourceAdapter (already have a PR in progress for this 
> > > one)
> > > These would not translate to a single ParDo - rather, they'd translate to 
> > > ParDo(estimate size and split according
> > > to the formula), Reshuffle, ParDo(read data) - or possibly to a bounded 
> > > SDF doing roughly the same (luckily after 
> > > https://github.com/apache/beam/pull/5940 all runners at master will 
> > > support bounded SDF so this is safe
> > > compatibility-wise). Pretty much like DatastoreIO does.
> > > 
> > > I would like to also propose to change the IO authoring guide 
> > > https://beam.apache.org/documentation/io/authoring-o
> > > verview/#when-to-implement-using-the-source-api to basically say "Never 
> > > implement a new BoundedSource unless you
> > > can support liquid sharding". And add a utility for computing a desired 
> > > number of splits.
> > > 
> > > There might be some more details here to iron out, but I wanted to check 
> > > with the community that this overall
> > > makes sense.
> > > 
> > > Thanks.

Reply via email to