Hi, thanks Eugene for analyzing and sharing that.I have one comment inline Etienne Le dimanche 15 juillet 2018 à 14:20 -0700, Eugene Kirpichov a écrit : > Hey beamers, > I've always wondered whether the BoundedSource implementations in the Beam > SDK are worth their complexity, or whether > they rather could be converted to the much easier to code ParDo style, which > is also more modular and allows you to > very easily implement readAll(). > > There's a handful: file-based sources, BigQuery, Bigtable, HBase, > Elasticsearch, MongoDB, Solr and a couple more. > > Curiously enough, BoundedSource vs. ParDo matters *only* on Dataflow, because > AFAICT Dataflow is the only runner that > cares about the things that BoundedSource can do and ParDo can't: > - size estimation (used to choose an initial number of workers) [ok, Flink > calls the function to return statistics, > but doesn't seem to do anything else with it] => Spark uses size estimation to set desired bundle size with something like desiredBundleSize = estimatedSize / nbOfWorkersConfigured (partitions)See https://github.com/apache/beam/blob/a5634128d194161aebc8d03229fdaa1066cf7739/runne rs/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java#L101
> - splitting into bundles of given size (Dataflow chooses the number of > bundles to create based on a simple formula > that's not entirely unlike K*sqrt(size)) > - liquid sharding (splitAtFraction()) > > If Dataflow didn't exist, there'd be no reason at all to use BoundedSource. > So the question "which ones can be > converted to ParDo" is really "which ones are used on Dataflow in ways that > make these functions matter". Previously, > my conservative assumption was that the answer is "all of them", but turns > out this is not so. > > Liquid sharding always matters; if the source is liquid-shardable, for now we > have to keep it a source (until SDF > gains liquid sharding - which should happen in a quarter or two I think). > > Choosing number of bundles to split into is easily done in SDK code, see > https://github.com/apache/beam/pull/5886 for > example; DatastoreIO does something similar. > > The remaining thing to analyze is, when does initial scaling matter. So as a > member of the Dataflow team, I analyzed > statistics of production Dataflow jobs in the past month. I can not share my > queries nor the data, because they are > proprietary to Google - so I am sharing just the general methodology and > conclusions, because they matter to the Beam > community. I looked at a few criteria, such as: > - The job should be not too short and not too long: if it's too short then > scaling couldn't have kicked in much at > all; if it's too long then dynamic autoscaling would have been sufficient. > - The job should use, at peak, at least a handful of workers (otherwise means > it wasn't used in settings where much > scaling happened) > After a couple more rounds of narrowing-down, with some hand-checking that > the results and criteria so far make sense, > I ended up with nothing - no jobs that would have suffered a serious > performance regression if their BoundedSource had > not supported initial size estimation [of course, except for the > liquid-shardable ones]. > > Based on this, I would like to propose to convert the following > BoundedSource-based IOs to ParDo-based, and while > we're at it, probably also add readAll() versions (not necessarily in exactly > the same PR): > - ElasticsearchIO > - SolrIO > - MongoDbIO > - MongoDbGridFSIO > - CassandraIO > - HCatalogIO > - HadoopInputFormatIO > - UnboundedToBoundedSourceAdapter (already have a PR in progress for this one) > These would not translate to a single ParDo - rather, they'd translate to > ParDo(estimate size and split according to > the formula), Reshuffle, ParDo(read data) - or possibly to a bounded SDF > doing roughly the same (luckily after https:/ > /github.com/apache/beam/pull/5940 all runners at master will support bounded > SDF so this is safe compatibility-wise). > Pretty much like DatastoreIO does. > > I would like to also propose to change the IO authoring guide > https://beam.apache.org/documentation/io/authoring-overv > iew/#when-to-implement-using-the-source-api to basically say "Never implement > a new BoundedSource unless you can > support liquid sharding". And add a utility for computing a desired number of > splits. > > There might be some more details here to iron out, but I wanted to check with > the community that this overall makes > sense. > > Thanks.