Hey all,

The PR https://github.com/apache/beam/pull/5940 was merged, and now all
runners at "master" support bounded-per-element SDFs!
Thanks +Ismaël Mejía <ieme...@gmail.com> for the reviews.
I have updated the Capability Matrix as well:
https://beam.apache.org/documentation/runners/capability-matrix/


On Mon, Jul 16, 2018 at 7:56 AM Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

> Hi guys,
>
> I think it's the purpose of SDF to simplify the BoundedSource like writing.
>
> I agree that extended @SplitRestriction is a good approach.
>
> Regards
> JB
>
> On 16/07/2018 16:52, Eugene Kirpichov wrote:
> > Hi Etienne - thanks for catching this; indeed, I somehow missed that
> > actually several runners do this same thing - it seemed to me as
> > something that can be done in user code (because it involves combining
> > estimated size + split in pretty much the same way), but I'm not so
> > sure: even though many runners have a "desired parallelism" option or
> > alike, it's not all of them, so we can't use such an option universally.
> >
> > Maybe then the right thing to do is to:
> > - Use bounded SDFs for these
> > - Change SDF @SplitRestriction API to take a desired number of splits as
> > a parameter, and introduce an API @EstimateOutputSizeBytes(element)
> > valid only on bounded SDFs
> > - Add some plumbing to the standard bounded SDF expansion so that
> > different runners can compute that parameter differently, the two
> > standard ways being "split into given number of splits" or "split based
> > on the sub-linear formula of estimated size".
> >
> > I think this would work, though this is somewhat more work than I
> > anticipated. Any alternative ideas?
> >
> > On Mon, Jul 16, 2018 at 3:07 AM Etienne Chauchot <echauc...@apache.org
> > <mailto:echauc...@apache.org>> wrote:
> >
> >     Hi,
> >     thanks Eugene for analyzing and sharing that.
> >     I have one comment inline
> >
> >     Etienne
> >
> >     Le dimanche 15 juillet 2018 à 14:20 -0700, Eugene Kirpichov a écrit :
> >>     Hey beamers,
> >>
> >>     I've always wondered whether the BoundedSource implementations in
> >>     the Beam SDK are worth their complexity, or whether they rather
> >>     could be converted to the much easier to code ParDo style, which
> >>     is also more modular and allows you to very easily implement
> >>     readAll().
> >>
> >>     There's a handful: file-based sources, BigQuery, Bigtable, HBase,
> >>     Elasticsearch, MongoDB, Solr and a couple more.
> >>
> >>     Curiously enough, BoundedSource vs. ParDo matters *only* on
> >>     Dataflow, because AFAICT Dataflow is the only runner that cares
> >>     about the things that BoundedSource can do and ParDo can't:
> >>     - size estimation (used to choose an initial number of workers)
> >>     [ok, Flink calls the function to return statistics, but doesn't
> >>     seem to do anything else with it]
> >     => Spark uses size estimation to set desired bundle size with
> >     something like desiredBundleSize = estimatedSize /
> >     nbOfWorkersConfigured (partitions)
> >     See
> >
> https://github.com/apache/beam/blob/a5634128d194161aebc8d03229fdaa1066cf7739/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java#L101
> >
> >
> >>     - splitting into bundles of given size (Dataflow chooses the
> >>     number of bundles to create based on a simple formula that's not
> >>     entirely unlike K*sqrt(size))
> >>     - liquid sharding (splitAtFraction())
> >>
> >>     If Dataflow didn't exist, there'd be no reason at all to use
> >>     BoundedSource. So the question "which ones can be converted to
> >>     ParDo" is really "which ones are used on Dataflow in ways that
> >>     make these functions matter". Previously, my conservative
> >>     assumption was that the answer is "all of them", but turns out
> >>     this is not so.
> >>
> >>     Liquid sharding always matters; if the source is liquid-shardable,
> >>     for now we have to keep it a source (until SDF gains liquid
> >>     sharding - which should happen in a quarter or two I think).
> >>
> >>     Choosing number of bundles to split into is easily done in SDK
> >>     code, see https://github.com/apache/beam/pull/5886 for example;
> >>     DatastoreIO does something similar.
> >>
> >>     The remaining thing to analyze is, when does initial scaling
> >>     matter. So as a member of the Dataflow team, I analyzed statistics
> >>     of production Dataflow jobs in the past month. I can not share my
> >>     queries nor the data, because they are proprietary to Google - so
> >>     I am sharing just the general methodology and conclusions, because
> >>     they matter to the Beam community. I looked at a few criteria,
> >>     such as:
> >>     - The job should be not too short and not too long: if it's too
> >>     short then scaling couldn't have kicked in much at all; if it's
> >>     too long then dynamic autoscaling would have been sufficient.
> >>     - The job should use, at peak, at least a handful of workers
> >>     (otherwise means it wasn't used in settings where much scaling
> >>     happened)
> >>     After a couple more rounds of narrowing-down, with some
> >>     hand-checking that the results and criteria so far make sense, I
> >>     ended up with nothing - no jobs that would have suffered a serious
> >>     performance regression if their BoundedSource had not supported
> >>     initial size estimation [of course, except for the
> >>     liquid-shardable ones].
> >>
> >>     Based on this, I would like to propose to convert the following
> >>     BoundedSource-based IOs to ParDo-based, and while we're at it,
> >>     probably also add readAll() versions (not necessarily in exactly
> >>     the same PR):
> >>     - ElasticsearchIO
> >>     - SolrIO
> >>     - MongoDbIO
> >>     - MongoDbGridFSIO
> >>     - CassandraIO
> >>     - HCatalogIO
> >>     - HadoopInputFormatIO
> >>     - UnboundedToBoundedSourceAdapter (already have a PR in progress
> >>     for this one)
> >>     These would not translate to a single ParDo - rather, they'd
> >>     translate to ParDo(estimate size and split according to the
> >>     formula), Reshuffle, ParDo(read data) - or possibly to a bounded
> >>     SDF doing roughly the same (luckily after
> >>     https://github.com/apache/beam/pull/5940 all runners at master
> >>     will support bounded SDF so this is safe compatibility-wise).
> >>     Pretty much like DatastoreIO does.
> >>
> >>     I would like to also propose to change the IO authoring
> >>     guide
> https://beam.apache.org/documentation/io/authoring-overview/#when-to-implement-using-the-source-api
>  to
> >>     basically say "Never implement a new BoundedSource unless you can
> >>     support liquid sharding". And add a utility for computing a
> >>     desired number of splits.
> >>
> >>     There might be some more details here to iron out, but I wanted to
> >>     check with the community that this overall makes sense.
> >>
> >>     Thanks.
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to