Hey all, The PR https://github.com/apache/beam/pull/5940 was merged, and now all runners at "master" support bounded-per-element SDFs! Thanks +Ismaël Mejía <ieme...@gmail.com> for the reviews. I have updated the Capability Matrix as well: https://beam.apache.org/documentation/runners/capability-matrix/
On Mon, Jul 16, 2018 at 7:56 AM Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > Hi guys, > > I think it's the purpose of SDF to simplify the BoundedSource like writing. > > I agree that extended @SplitRestriction is a good approach. > > Regards > JB > > On 16/07/2018 16:52, Eugene Kirpichov wrote: > > Hi Etienne - thanks for catching this; indeed, I somehow missed that > > actually several runners do this same thing - it seemed to me as > > something that can be done in user code (because it involves combining > > estimated size + split in pretty much the same way), but I'm not so > > sure: even though many runners have a "desired parallelism" option or > > alike, it's not all of them, so we can't use such an option universally. > > > > Maybe then the right thing to do is to: > > - Use bounded SDFs for these > > - Change SDF @SplitRestriction API to take a desired number of splits as > > a parameter, and introduce an API @EstimateOutputSizeBytes(element) > > valid only on bounded SDFs > > - Add some plumbing to the standard bounded SDF expansion so that > > different runners can compute that parameter differently, the two > > standard ways being "split into given number of splits" or "split based > > on the sub-linear formula of estimated size". > > > > I think this would work, though this is somewhat more work than I > > anticipated. Any alternative ideas? > > > > On Mon, Jul 16, 2018 at 3:07 AM Etienne Chauchot <echauc...@apache.org > > <mailto:echauc...@apache.org>> wrote: > > > > Hi, > > thanks Eugene for analyzing and sharing that. > > I have one comment inline > > > > Etienne > > > > Le dimanche 15 juillet 2018 à 14:20 -0700, Eugene Kirpichov a écrit : > >> Hey beamers, > >> > >> I've always wondered whether the BoundedSource implementations in > >> the Beam SDK are worth their complexity, or whether they rather > >> could be converted to the much easier to code ParDo style, which > >> is also more modular and allows you to very easily implement > >> readAll(). > >> > >> There's a handful: file-based sources, BigQuery, Bigtable, HBase, > >> Elasticsearch, MongoDB, Solr and a couple more. > >> > >> Curiously enough, BoundedSource vs. ParDo matters *only* on > >> Dataflow, because AFAICT Dataflow is the only runner that cares > >> about the things that BoundedSource can do and ParDo can't: > >> - size estimation (used to choose an initial number of workers) > >> [ok, Flink calls the function to return statistics, but doesn't > >> seem to do anything else with it] > > => Spark uses size estimation to set desired bundle size with > > something like desiredBundleSize = estimatedSize / > > nbOfWorkersConfigured (partitions) > > See > > > https://github.com/apache/beam/blob/a5634128d194161aebc8d03229fdaa1066cf7739/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java#L101 > > > > > >> - splitting into bundles of given size (Dataflow chooses the > >> number of bundles to create based on a simple formula that's not > >> entirely unlike K*sqrt(size)) > >> - liquid sharding (splitAtFraction()) > >> > >> If Dataflow didn't exist, there'd be no reason at all to use > >> BoundedSource. So the question "which ones can be converted to > >> ParDo" is really "which ones are used on Dataflow in ways that > >> make these functions matter". Previously, my conservative > >> assumption was that the answer is "all of them", but turns out > >> this is not so. > >> > >> Liquid sharding always matters; if the source is liquid-shardable, > >> for now we have to keep it a source (until SDF gains liquid > >> sharding - which should happen in a quarter or two I think). > >> > >> Choosing number of bundles to split into is easily done in SDK > >> code, see https://github.com/apache/beam/pull/5886 for example; > >> DatastoreIO does something similar. > >> > >> The remaining thing to analyze is, when does initial scaling > >> matter. So as a member of the Dataflow team, I analyzed statistics > >> of production Dataflow jobs in the past month. I can not share my > >> queries nor the data, because they are proprietary to Google - so > >> I am sharing just the general methodology and conclusions, because > >> they matter to the Beam community. I looked at a few criteria, > >> such as: > >> - The job should be not too short and not too long: if it's too > >> short then scaling couldn't have kicked in much at all; if it's > >> too long then dynamic autoscaling would have been sufficient. > >> - The job should use, at peak, at least a handful of workers > >> (otherwise means it wasn't used in settings where much scaling > >> happened) > >> After a couple more rounds of narrowing-down, with some > >> hand-checking that the results and criteria so far make sense, I > >> ended up with nothing - no jobs that would have suffered a serious > >> performance regression if their BoundedSource had not supported > >> initial size estimation [of course, except for the > >> liquid-shardable ones]. > >> > >> Based on this, I would like to propose to convert the following > >> BoundedSource-based IOs to ParDo-based, and while we're at it, > >> probably also add readAll() versions (not necessarily in exactly > >> the same PR): > >> - ElasticsearchIO > >> - SolrIO > >> - MongoDbIO > >> - MongoDbGridFSIO > >> - CassandraIO > >> - HCatalogIO > >> - HadoopInputFormatIO > >> - UnboundedToBoundedSourceAdapter (already have a PR in progress > >> for this one) > >> These would not translate to a single ParDo - rather, they'd > >> translate to ParDo(estimate size and split according to the > >> formula), Reshuffle, ParDo(read data) - or possibly to a bounded > >> SDF doing roughly the same (luckily after > >> https://github.com/apache/beam/pull/5940 all runners at master > >> will support bounded SDF so this is safe compatibility-wise). > >> Pretty much like DatastoreIO does. > >> > >> I would like to also propose to change the IO authoring > >> guide > https://beam.apache.org/documentation/io/authoring-overview/#when-to-implement-using-the-source-api > to > >> basically say "Never implement a new BoundedSource unless you can > >> support liquid sharding". And add a utility for computing a > >> desired number of splits. > >> > >> There might be some more details here to iron out, but I wanted to > >> check with the community that this overall makes sense. > >> > >> Thanks. > > > > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com >