On Sun, Jul 15, 2018 at 2:20 PM Eugene Kirpichov <kirpic...@google.com>
wrote:

> Hey beamers,
>
> I've always wondered whether the BoundedSource implementations in the Beam
> SDK are worth their complexity, or whether they rather could be converted
> to the much easier to code ParDo style, which is also more modular and
> allows you to very easily implement readAll().
>
> There's a handful: file-based sources, BigQuery, Bigtable, HBase,
> Elasticsearch, MongoDB, Solr and a couple more.
>
> Curiously enough, BoundedSource vs. ParDo matters *only* on Dataflow,
> because AFAICT Dataflow is the only runner that cares about the things that
> BoundedSource can do and ParDo can't:
> - size estimation (used to choose an initial number of workers) [ok, Flink
> calls the function to return statistics, but doesn't seem to do anything
> else with it]
> - splitting into bundles of given size (Dataflow chooses the number of
> bundles to create based on a simple formula that's not entirely unlike
> K*sqrt(size))
> - liquid sharding (splitAtFraction())
>
> If Dataflow didn't exist, there'd be no reason at all to use
> BoundedSource. So the question "which ones can be converted to ParDo" is
> really "which ones are used on Dataflow in ways that make these functions
> matter". Previously, my conservative assumption was that the answer is "all
> of them", but turns out this is not so.
>
> Liquid sharding always matters; if the source is liquid-shardable, for now
> we have to keep it a source (until SDF gains liquid sharding - which should
> happen in a quarter or two I think).
>
> Choosing number of bundles to split into is easily done in SDK code, see
> https://github.com/apache/beam/pull/5886 for example; DatastoreIO does
> something similar.
>
> The remaining thing to analyze is, when does initial scaling matter. So as
> a member of the Dataflow team, I analyzed statistics of production Dataflow
> jobs in the past month. I can not share my queries nor the data, because
> they are proprietary to Google - so I am sharing just the general
> methodology and conclusions, because they matter to the Beam community. I
> looked at a few criteria, such as:
> - The job should be not too short and not too long: if it's too short then
> scaling couldn't have kicked in much at all; if it's too long then dynamic
> autoscaling would have been sufficient.
> - The job should use, at peak, at least a handful of workers (otherwise
> means it wasn't used in settings where much scaling happened)
> After a couple more rounds of narrowing-down, with some hand-checking that
> the results and criteria so far make sense, I ended up with nothing - no
> jobs that would have suffered a serious performance regression if their
> BoundedSource had not supported initial size estimation [of course, except
> for the liquid-shardable ones].
>
> Based on this, I would like to propose to convert the following
> BoundedSource-based IOs to ParDo-based, and while we're at it, probably
> also add readAll() versions (not necessarily in exactly the same PR):
> - ElasticsearchIO
> - SolrIO
> - MongoDbIO
> - MongoDbGridFSIO
> - CassandraIO
> - HCatalogIO
> - HadoopInputFormatIO
> - UnboundedToBoundedSourceAdapter (already have a PR in progress for this
> one)
> These would not translate to a single ParDo - rather, they'd translate to
> ParDo(estimate size and split according to the formula), Reshuffle,
> ParDo(read data) - or possibly to a bounded SDF doing roughly the same
> (luckily after https://github.com/apache/beam/pull/5940 all runners at
> master will support bounded SDF so this is safe compatibility-wise). Pretty
> much like DatastoreIO does.
>

I think there is some value in having a single API that one implements,
rather than having every IO manually implement the ParDo + Reshuffle +
ParDo pattern. Until these convert over to SDFs, I'm not sure there's a net
win to manually converting them to ParDos rather than automatically
executing BoundedSources as ParDos. (There's also the sizing hooks that
have been pointed out, and even for those that don't yet support liquid
sharding, it'd be nice if, when someone wants to add liquid sharding to an
IO, they don't have to go in and massively restructure things.)

But perhaps I'm underestimating the complexity of using the BoundedSource
API vs. manually writing a sequence of ParDos. Or is it that existing
BoundedSources
don't lend themselves well to readAll? (Would a readAll extends
PTransform<BoundedSource<T>, T> be that hard? Specific sources could
implement a nicer ReadAll that is built on this.)

I would like to also propose to change the IO authoring guide
> https://beam.apache.org/documentation/io/authoring-overview/#when-to-implement-using-the-source-api
>  to
> basically say "Never implement a new BoundedSource unless you can support
> liquid sharding". And add a utility for computing a desired number of
> splits.
>
> There might be some more details here to iron out, but I wanted to check
> with the community that this overall makes sense.
>
> Thanks.
>

Reply via email to