Hey beamers, I've always wondered whether the BoundedSource implementations in the Beam SDK are worth their complexity, or whether they rather could be converted to the much easier to code ParDo style, which is also more modular and allows you to very easily implement readAll().
There's a handful: file-based sources, BigQuery, Bigtable, HBase, Elasticsearch, MongoDB, Solr and a couple more. Curiously enough, BoundedSource vs. ParDo matters *only* on Dataflow, because AFAICT Dataflow is the only runner that cares about the things that BoundedSource can do and ParDo can't: - size estimation (used to choose an initial number of workers) [ok, Flink calls the function to return statistics, but doesn't seem to do anything else with it] - splitting into bundles of given size (Dataflow chooses the number of bundles to create based on a simple formula that's not entirely unlike K*sqrt(size)) - liquid sharding (splitAtFraction()) If Dataflow didn't exist, there'd be no reason at all to use BoundedSource. So the question "which ones can be converted to ParDo" is really "which ones are used on Dataflow in ways that make these functions matter". Previously, my conservative assumption was that the answer is "all of them", but turns out this is not so. Liquid sharding always matters; if the source is liquid-shardable, for now we have to keep it a source (until SDF gains liquid sharding - which should happen in a quarter or two I think). Choosing number of bundles to split into is easily done in SDK code, see https://github.com/apache/beam/pull/5886 for example; DatastoreIO does something similar. The remaining thing to analyze is, when does initial scaling matter. So as a member of the Dataflow team, I analyzed statistics of production Dataflow jobs in the past month. I can not share my queries nor the data, because they are proprietary to Google - so I am sharing just the general methodology and conclusions, because they matter to the Beam community. I looked at a few criteria, such as: - The job should be not too short and not too long: if it's too short then scaling couldn't have kicked in much at all; if it's too long then dynamic autoscaling would have been sufficient. - The job should use, at peak, at least a handful of workers (otherwise means it wasn't used in settings where much scaling happened) After a couple more rounds of narrowing-down, with some hand-checking that the results and criteria so far make sense, I ended up with nothing - no jobs that would have suffered a serious performance regression if their BoundedSource had not supported initial size estimation [of course, except for the liquid-shardable ones]. Based on this, I would like to propose to convert the following BoundedSource-based IOs to ParDo-based, and while we're at it, probably also add readAll() versions (not necessarily in exactly the same PR): - ElasticsearchIO - SolrIO - MongoDbIO - MongoDbGridFSIO - CassandraIO - HCatalogIO - HadoopInputFormatIO - UnboundedToBoundedSourceAdapter (already have a PR in progress for this one) These would not translate to a single ParDo - rather, they'd translate to ParDo(estimate size and split according to the formula), Reshuffle, ParDo(read data) - or possibly to a bounded SDF doing roughly the same (luckily after https://github.com/apache/beam/pull/5940 all runners at master will support bounded SDF so this is safe compatibility-wise). Pretty much like DatastoreIO does. I would like to also propose to change the IO authoring guide https://beam.apache.org/documentation/io/authoring-overview/#when-to-implement-using-the-source-api to basically say "Never implement a new BoundedSource unless you can support liquid sharding". And add a utility for computing a desired number of splits. There might be some more details here to iron out, but I wanted to check with the community that this overall makes sense. Thanks.