Hi, thanks Eugene for analyzing and sharing that.I have one comment inline
Etienne
Le dimanche 15 juillet 2018 à 14:20 -0700, Eugene Kirpichov a écrit :
> Hey beamers,
> I've always wondered whether the BoundedSource implementations in the Beam 
> SDK are worth their complexity, or whether
> they rather could be converted to the much easier to code ParDo style, which 
> is also more modular and allows you to
> very easily implement readAll().
> 
> There's a handful: file-based sources, BigQuery, Bigtable, HBase, 
> Elasticsearch, MongoDB, Solr and a couple more.
> 
> Curiously enough, BoundedSource vs. ParDo matters *only* on Dataflow, because 
> AFAICT Dataflow is the only runner that
> cares about the things that BoundedSource can do and ParDo can't:
> - size estimation (used to choose an initial number of workers) [ok, Flink 
> calls the function to return statistics,
> but doesn't seem to do anything else with it]
=> Spark uses size estimation to set desired bundle size with something like 
desiredBundleSize = estimatedSize /
nbOfWorkersConfigured (partitions)See 
https://github.com/apache/beam/blob/a5634128d194161aebc8d03229fdaa1066cf7739/runne
rs/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java#L101

> - splitting into bundles of given size (Dataflow chooses the number of 
> bundles to create based on a simple formula
> that's not entirely unlike K*sqrt(size))
> - liquid sharding (splitAtFraction())
> 
> If Dataflow didn't exist, there'd be no reason at all to use BoundedSource. 
> So the question "which ones can be
> converted to ParDo" is really "which ones are used on Dataflow in ways that 
> make these functions matter". Previously,
> my conservative assumption was that the answer is "all of them", but turns 
> out this is not so.
> 
> Liquid sharding always matters; if the source is liquid-shardable, for now we 
> have to keep it a source (until SDF
> gains liquid sharding - which should happen in a quarter or two I think).
> 
> Choosing number of bundles to split into is easily done in SDK code, see 
> https://github.com/apache/beam/pull/5886 for
> example; DatastoreIO does something similar.
> 
> The remaining thing to analyze is, when does initial scaling matter. So as a 
> member of the Dataflow team, I analyzed
> statistics of production Dataflow jobs in the past month. I can not share my 
> queries nor the data, because they are
> proprietary to Google - so I am sharing just the general methodology and 
> conclusions, because they matter to the Beam
> community. I looked at a few criteria, such as:
> - The job should be not too short and not too long: if it's too short then 
> scaling couldn't have kicked in much at
> all; if it's too long then dynamic autoscaling would have been sufficient.
> - The job should use, at peak, at least a handful of workers (otherwise means 
> it wasn't used in settings where much
> scaling happened)
> After a couple more rounds of narrowing-down, with some hand-checking that 
> the results and criteria so far make sense,
> I ended up with nothing - no jobs that would have suffered a serious 
> performance regression if their BoundedSource had
> not supported initial size estimation [of course, except for the 
> liquid-shardable ones].
> 
> Based on this, I would like to propose to convert the following 
> BoundedSource-based IOs to ParDo-based, and while
> we're at it, probably also add readAll() versions (not necessarily in exactly 
> the same PR):
> - ElasticsearchIO
> - SolrIO
> - MongoDbIO
> - MongoDbGridFSIO
> - CassandraIO
> - HCatalogIO
> - HadoopInputFormatIO
> - UnboundedToBoundedSourceAdapter (already have a PR in progress for this one)
> These would not translate to a single ParDo - rather, they'd translate to 
> ParDo(estimate size and split according to
> the formula), Reshuffle, ParDo(read data) - or possibly to a bounded SDF 
> doing roughly the same (luckily after https:/
> /github.com/apache/beam/pull/5940 all runners at master will support bounded 
> SDF so this is safe compatibility-wise).
> Pretty much like DatastoreIO does.
> 
> I would like to also propose to change the IO authoring guide 
> https://beam.apache.org/documentation/io/authoring-overv
> iew/#when-to-implement-using-the-source-api to basically say "Never implement 
> a new BoundedSource unless you can
> support liquid sharding". And add a utility for computing a desired number of 
> splits.
> 
> There might be some more details here to iron out, but I wanted to check with 
> the community that this overall makes
> sense.
> 
> Thanks.

Reply via email to