Hi Etienne - thanks for catching this; indeed, I somehow missed that
actually several runners do this same thing - it seemed to me as something
that can be done in user code (because it involves combining estimated size
+ split in pretty much the same way), but I'm not so sure: even though many
runners have a "desired parallelism" option or alike, it's not all of them,
so we can't use such an option universally.

Maybe then the right thing to do is to:
- Use bounded SDFs for these
- Change SDF @SplitRestriction API to take a desired number of splits as a
parameter, and introduce an API @EstimateOutputSizeBytes(element) valid
only on bounded SDFs
- Add some plumbing to the standard bounded SDF expansion so that different
runners can compute that parameter differently, the two standard ways being
"split into given number of splits" or "split based on the sub-linear
formula of estimated size".

I think this would work, though this is somewhat more work than I
anticipated. Any alternative ideas?

On Mon, Jul 16, 2018 at 3:07 AM Etienne Chauchot <echauc...@apache.org>
wrote:

> Hi,
> thanks Eugene for analyzing and sharing that.
> I have one comment inline
>
> Etienne
>
> Le dimanche 15 juillet 2018 à 14:20 -0700, Eugene Kirpichov a écrit :
>
> Hey beamers,
>
> I've always wondered whether the BoundedSource implementations in the Beam
> SDK are worth their complexity, or whether they rather could be converted
> to the much easier to code ParDo style, which is also more modular and
> allows you to very easily implement readAll().
>
> There's a handful: file-based sources, BigQuery, Bigtable, HBase,
> Elasticsearch, MongoDB, Solr and a couple more.
>
> Curiously enough, BoundedSource vs. ParDo matters *only* on Dataflow,
> because AFAICT Dataflow is the only runner that cares about the things that
> BoundedSource can do and ParDo can't:
> - size estimation (used to choose an initial number of workers) [ok, Flink
> calls the function to return statistics, but doesn't seem to do anything
> else with it]
>
> => Spark uses size estimation to set desired bundle size with something
> like desiredBundleSize = estimatedSize / nbOfWorkersConfigured (partitions)
> See
> https://github.com/apache/beam/blob/a5634128d194161aebc8d03229fdaa1066cf7739/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java#L101
>
>
> - splitting into bundles of given size (Dataflow chooses the number of
> bundles to create based on a simple formula that's not entirely unlike
> K*sqrt(size))
> - liquid sharding (splitAtFraction())
>
> If Dataflow didn't exist, there'd be no reason at all to use
> BoundedSource. So the question "which ones can be converted to ParDo" is
> really "which ones are used on Dataflow in ways that make these functions
> matter". Previously, my conservative assumption was that the answer is "all
> of them", but turns out this is not so.
>
> Liquid sharding always matters; if the source is liquid-shardable, for now
> we have to keep it a source (until SDF gains liquid sharding - which should
> happen in a quarter or two I think).
>
> Choosing number of bundles to split into is easily done in SDK code, see
> https://github.com/apache/beam/pull/5886 for example; DatastoreIO does
> something similar.
>
> The remaining thing to analyze is, when does initial scaling matter. So as
> a member of the Dataflow team, I analyzed statistics of production Dataflow
> jobs in the past month. I can not share my queries nor the data, because
> they are proprietary to Google - so I am sharing just the general
> methodology and conclusions, because they matter to the Beam community. I
> looked at a few criteria, such as:
> - The job should be not too short and not too long: if it's too short then
> scaling couldn't have kicked in much at all; if it's too long then dynamic
> autoscaling would have been sufficient.
> - The job should use, at peak, at least a handful of workers (otherwise
> means it wasn't used in settings where much scaling happened)
> After a couple more rounds of narrowing-down, with some hand-checking that
> the results and criteria so far make sense, I ended up with nothing - no
> jobs that would have suffered a serious performance regression if their
> BoundedSource had not supported initial size estimation [of course, except
> for the liquid-shardable ones].
>
> Based on this, I would like to propose to convert the following
> BoundedSource-based IOs to ParDo-based, and while we're at it, probably
> also add readAll() versions (not necessarily in exactly the same PR):
> - ElasticsearchIO
> - SolrIO
> - MongoDbIO
> - MongoDbGridFSIO
> - CassandraIO
> - HCatalogIO
> - HadoopInputFormatIO
> - UnboundedToBoundedSourceAdapter (already have a PR in progress for this
> one)
> These would not translate to a single ParDo - rather, they'd translate to
> ParDo(estimate size and split according to the formula), Reshuffle,
> ParDo(read data) - or possibly to a bounded SDF doing roughly the same
> (luckily after https://github.com/apache/beam/pull/5940 all runners at
> master will support bounded SDF so this is safe compatibility-wise). Pretty
> much like DatastoreIO does.
>
> I would like to also propose to change the IO authoring guide
> https://beam.apache.org/documentation/io/authoring-overview/#when-to-implement-using-the-source-api
>  to
> basically say "Never implement a new BoundedSource unless you can support
> liquid sharding". And add a utility for computing a desired number of
> splits.
>
> There might be some more details here to iron out, but I wanted to check
> with the community that this overall makes sense.
>
> Thanks.
>
>

Reply via email to