On Tue, Jul 17, 2018 at 2:49 AM Etienne Chauchot <echauc...@apache.org>
wrote:

> Hi Eugene
>
> Le lundi 16 juillet 2018 à 07:52 -0700, Eugene Kirpichov a écrit :
>
> Hi Etienne - thanks for catching this; indeed, I somehow missed that
> actually several runners do this same thing - it seemed to me as something
> that can be done in user code (because it involves combining estimated size
> + split in pretty much the same way),
>
>
> When you say "user code", you mean IO writter code by opposition to runner
> code right ?
>
Correct: "user code" is what happens in the SDK or the user pipeline.


>
>
> but I'm not so sure: even though many runners have a "desired parallelism"
> option or alike, it's not all of them, so we can't use such an option
> universally.
>
>
> Agree, cannot be universal
>
>
> Maybe then the right thing to do is to:
> - Use bounded SDFs for these
> - Change SDF @SplitRestriction API to take a desired number of splits as a
> parameter, and introduce an API @EstimateOutputSizeBytes(element) valid
> only on bounded SDFs
>
> Agree with the idea but EstimateOutpuSize must return the size of the
> dataset not of an element.
>
Please recall that the element here is e.g. a filename, or name of a
BigTable table, or something like that - i.e. the element describes the
dataset, and the restriction describes what part of the dataset.

If e.g. we have a PCollection<String> of filenames and apply a ReadTextFn
SDF to it, and want the runner to know the total size of all files - the
runner could insert some transforms to apply EstimateOutputSize to each
element and Sum.globally() them.


> On some runners, each worker is set to a given amount of heap. Thus, it is
> important that a runner could evaluate the size of the whole dataset to
> determine the size of each split (to fit in memory of the workers) and thus
> tell the bounded SDF the number of desired splits.
>
> - Add some plumbing to the standard bounded SDF expansion so that
> different runners can compute that parameter differently, the two standard
> ways being "split into given number of splits" or "split based on the
> sub-linear formula of estimated size".
>
> I think this would work, though this is somewhat more work than I
> anticipated. Any alternative ideas?
>
> +1 It will be very similar for an IO developer (@EstimateOutputSizeBytes
> will be similar to source.getEstimatedSizeBytes(),
> and @SplitRestriction(desiredSplits) similar to
> source.split(desiredBundleSize))
>
Yeah I'm not sure this is actually a good thing that these APIs end up so
similar to the old ones - I was hoping we could come up with something
better - but seems like there's no viable alternative at this point :)


>
> Etienne
>
>
> On Mon, Jul 16, 2018 at 3:07 AM Etienne Chauchot <echauc...@apache.org>
> wrote:
>
> Hi,
> thanks Eugene for analyzing and sharing that.
> I have one comment inline
>
> Etienne
>
> Le dimanche 15 juillet 2018 à 14:20 -0700, Eugene Kirpichov a écrit :
>
> Hey beamers,
>
> I've always wondered whether the BoundedSource implementations in the Beam
> SDK are worth their complexity, or whether they rather could be converted
> to the much easier to code ParDo style, which is also more modular and
> allows you to very easily implement readAll().
>
> There's a handful: file-based sources, BigQuery, Bigtable, HBase,
> Elasticsearch, MongoDB, Solr and a couple more.
>
> Curiously enough, BoundedSource vs. ParDo matters *only* on Dataflow,
> because AFAICT Dataflow is the only runner that cares about the things that
> BoundedSource can do and ParDo can't:
> - size estimation (used to choose an initial number of workers) [ok, Flink
> calls the function to return statistics, but doesn't seem to do anything
> else with it]
>
> => Spark uses size estimation to set desired bundle size with something
> like desiredBundleSize = estimatedSize / nbOfWorkersConfigured (partitions)
> See
> https://github.com/apache/beam/blob/a5634128d194161aebc8d03229fdaa1066cf7739/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java#L101
>
>
> - splitting into bundles of given size (Dataflow chooses the number of
> bundles to create based on a simple formula that's not entirely unlike
> K*sqrt(size))
> - liquid sharding (splitAtFraction())
>
> If Dataflow didn't exist, there'd be no reason at all to use
> BoundedSource. So the question "which ones can be converted to ParDo" is
> really "which ones are used on Dataflow in ways that make these functions
> matter". Previously, my conservative assumption was that the answer is "all
> of them", but turns out this is not so.
>
> Liquid sharding always matters; if the source is liquid-shardable, for now
> we have to keep it a source (until SDF gains liquid sharding - which should
> happen in a quarter or two I think).
>
> Choosing number of bundles to split into is easily done in SDK code, see
> https://github.com/apache/beam/pull/5886 for example; DatastoreIO does
> something similar.
>
> The remaining thing to analyze is, when does initial scaling matter. So as
> a member of the Dataflow team, I analyzed statistics of production Dataflow
> jobs in the past month. I can not share my queries nor the data, because
> they are proprietary to Google - so I am sharing just the general
> methodology and conclusions, because they matter to the Beam community. I
> looked at a few criteria, such as:
> - The job should be not too short and not too long: if it's too short then
> scaling couldn't have kicked in much at all; if it's too long then dynamic
> autoscaling would have been sufficient.
> - The job should use, at peak, at least a handful of workers (otherwise
> means it wasn't used in settings where much scaling happened)
> After a couple more rounds of narrowing-down, with some hand-checking that
> the results and criteria so far make sense, I ended up with nothing - no
> jobs that would have suffered a serious performance regression if their
> BoundedSource had not supported initial size estimation [of course, except
> for the liquid-shardable ones].
>
> Based on this, I would like to propose to convert the following
> BoundedSource-based IOs to ParDo-based, and while we're at it, probably
> also add readAll() versions (not necessarily in exactly the same PR):
> - ElasticsearchIO
> - SolrIO
> - MongoDbIO
> - MongoDbGridFSIO
> - CassandraIO
> - HCatalogIO
> - HadoopInputFormatIO
> - UnboundedToBoundedSourceAdapter (already have a PR in progress for this
> one)
> These would not translate to a single ParDo - rather, they'd translate to
> ParDo(estimate size and split according to the formula), Reshuffle,
> ParDo(read data) - or possibly to a bounded SDF doing roughly the same
> (luckily after https://github.com/apache/beam/pull/5940 all runners at
> master will support bounded SDF so this is safe compatibility-wise). Pretty
> much like DatastoreIO does.
>
> I would like to also propose to change the IO authoring guide
> https://beam.apache.org/documentation/io/authoring-overview/#when-to-implement-using-the-source-api
>  to
> basically say "Never implement a new BoundedSource unless you can support
> liquid sharding". And add a utility for computing a desired number of
> splits.
>
> There might be some more details here to iron out, but I wanted to check
> with the community that this overall makes sense.
>
> Thanks.
>
>

Reply via email to