Hey beamers,

I've always wondered whether the BoundedSource implementations in the Beam
SDK are worth their complexity, or whether they rather could be converted
to the much easier to code ParDo style, which is also more modular and
allows you to very easily implement readAll().

There's a handful: file-based sources, BigQuery, Bigtable, HBase,
Elasticsearch, MongoDB, Solr and a couple more.

Curiously enough, BoundedSource vs. ParDo matters *only* on Dataflow,
because AFAICT Dataflow is the only runner that cares about the things that
BoundedSource can do and ParDo can't:
- size estimation (used to choose an initial number of workers) [ok, Flink
calls the function to return statistics, but doesn't seem to do anything
else with it]
- splitting into bundles of given size (Dataflow chooses the number of
bundles to create based on a simple formula that's not entirely unlike
K*sqrt(size))
- liquid sharding (splitAtFraction())

If Dataflow didn't exist, there'd be no reason at all to use BoundedSource.
So the question "which ones can be converted to ParDo" is really "which
ones are used on Dataflow in ways that make these functions matter".
Previously, my conservative assumption was that the answer is "all of
them", but turns out this is not so.

Liquid sharding always matters; if the source is liquid-shardable, for now
we have to keep it a source (until SDF gains liquid sharding - which should
happen in a quarter or two I think).

Choosing number of bundles to split into is easily done in SDK code, see
https://github.com/apache/beam/pull/5886 for example; DatastoreIO does
something similar.

The remaining thing to analyze is, when does initial scaling matter. So as
a member of the Dataflow team, I analyzed statistics of production Dataflow
jobs in the past month. I can not share my queries nor the data, because
they are proprietary to Google - so I am sharing just the general
methodology and conclusions, because they matter to the Beam community. I
looked at a few criteria, such as:
- The job should be not too short and not too long: if it's too short then
scaling couldn't have kicked in much at all; if it's too long then dynamic
autoscaling would have been sufficient.
- The job should use, at peak, at least a handful of workers (otherwise
means it wasn't used in settings where much scaling happened)
After a couple more rounds of narrowing-down, with some hand-checking that
the results and criteria so far make sense, I ended up with nothing - no
jobs that would have suffered a serious performance regression if their
BoundedSource had not supported initial size estimation [of course, except
for the liquid-shardable ones].

Based on this, I would like to propose to convert the following
BoundedSource-based IOs to ParDo-based, and while we're at it, probably
also add readAll() versions (not necessarily in exactly the same PR):
- ElasticsearchIO
- SolrIO
- MongoDbIO
- MongoDbGridFSIO
- CassandraIO
- HCatalogIO
- HadoopInputFormatIO
- UnboundedToBoundedSourceAdapter (already have a PR in progress for this
one)
These would not translate to a single ParDo - rather, they'd translate to
ParDo(estimate size and split according to the formula), Reshuffle,
ParDo(read data) - or possibly to a bounded SDF doing roughly the same
(luckily after https://github.com/apache/beam/pull/5940 all runners at
master will support bounded SDF so this is safe compatibility-wise). Pretty
much like DatastoreIO does.

I would like to also propose to change the IO authoring guide
https://beam.apache.org/documentation/io/authoring-overview/#when-to-implement-using-the-source-api
to
basically say "Never implement a new BoundedSource unless you can support
liquid sharding". And add a utility for computing a desired number of
splits.

There might be some more details here to iron out, but I wanted to check
with the community that this overall makes sense.

Thanks.

Reply via email to