On Tue, Jul 17, 2018 at 2:49 AM Etienne Chauchot <echauc...@apache.org> wrote:
> Hi Eugene > > Le lundi 16 juillet 2018 à 07:52 -0700, Eugene Kirpichov a écrit : > > Hi Etienne - thanks for catching this; indeed, I somehow missed that > actually several runners do this same thing - it seemed to me as something > that can be done in user code (because it involves combining estimated size > + split in pretty much the same way), > > > When you say "user code", you mean IO writter code by opposition to runner > code right ? > Correct: "user code" is what happens in the SDK or the user pipeline. > > > but I'm not so sure: even though many runners have a "desired parallelism" > option or alike, it's not all of them, so we can't use such an option > universally. > > > Agree, cannot be universal > > > Maybe then the right thing to do is to: > - Use bounded SDFs for these > - Change SDF @SplitRestriction API to take a desired number of splits as a > parameter, and introduce an API @EstimateOutputSizeBytes(element) valid > only on bounded SDFs > > Agree with the idea but EstimateOutpuSize must return the size of the > dataset not of an element. > Please recall that the element here is e.g. a filename, or name of a BigTable table, or something like that - i.e. the element describes the dataset, and the restriction describes what part of the dataset. If e.g. we have a PCollection<String> of filenames and apply a ReadTextFn SDF to it, and want the runner to know the total size of all files - the runner could insert some transforms to apply EstimateOutputSize to each element and Sum.globally() them. > On some runners, each worker is set to a given amount of heap. Thus, it is > important that a runner could evaluate the size of the whole dataset to > determine the size of each split (to fit in memory of the workers) and thus > tell the bounded SDF the number of desired splits. > > - Add some plumbing to the standard bounded SDF expansion so that > different runners can compute that parameter differently, the two standard > ways being "split into given number of splits" or "split based on the > sub-linear formula of estimated size". > > I think this would work, though this is somewhat more work than I > anticipated. Any alternative ideas? > > +1 It will be very similar for an IO developer (@EstimateOutputSizeBytes > will be similar to source.getEstimatedSizeBytes(), > and @SplitRestriction(desiredSplits) similar to > source.split(desiredBundleSize)) > Yeah I'm not sure this is actually a good thing that these APIs end up so similar to the old ones - I was hoping we could come up with something better - but seems like there's no viable alternative at this point :) > > Etienne > > > On Mon, Jul 16, 2018 at 3:07 AM Etienne Chauchot <echauc...@apache.org> > wrote: > > Hi, > thanks Eugene for analyzing and sharing that. > I have one comment inline > > Etienne > > Le dimanche 15 juillet 2018 à 14:20 -0700, Eugene Kirpichov a écrit : > > Hey beamers, > > I've always wondered whether the BoundedSource implementations in the Beam > SDK are worth their complexity, or whether they rather could be converted > to the much easier to code ParDo style, which is also more modular and > allows you to very easily implement readAll(). > > There's a handful: file-based sources, BigQuery, Bigtable, HBase, > Elasticsearch, MongoDB, Solr and a couple more. > > Curiously enough, BoundedSource vs. ParDo matters *only* on Dataflow, > because AFAICT Dataflow is the only runner that cares about the things that > BoundedSource can do and ParDo can't: > - size estimation (used to choose an initial number of workers) [ok, Flink > calls the function to return statistics, but doesn't seem to do anything > else with it] > > => Spark uses size estimation to set desired bundle size with something > like desiredBundleSize = estimatedSize / nbOfWorkersConfigured (partitions) > See > https://github.com/apache/beam/blob/a5634128d194161aebc8d03229fdaa1066cf7739/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java#L101 > > > - splitting into bundles of given size (Dataflow chooses the number of > bundles to create based on a simple formula that's not entirely unlike > K*sqrt(size)) > - liquid sharding (splitAtFraction()) > > If Dataflow didn't exist, there'd be no reason at all to use > BoundedSource. So the question "which ones can be converted to ParDo" is > really "which ones are used on Dataflow in ways that make these functions > matter". Previously, my conservative assumption was that the answer is "all > of them", but turns out this is not so. > > Liquid sharding always matters; if the source is liquid-shardable, for now > we have to keep it a source (until SDF gains liquid sharding - which should > happen in a quarter or two I think). > > Choosing number of bundles to split into is easily done in SDK code, see > https://github.com/apache/beam/pull/5886 for example; DatastoreIO does > something similar. > > The remaining thing to analyze is, when does initial scaling matter. So as > a member of the Dataflow team, I analyzed statistics of production Dataflow > jobs in the past month. I can not share my queries nor the data, because > they are proprietary to Google - so I am sharing just the general > methodology and conclusions, because they matter to the Beam community. I > looked at a few criteria, such as: > - The job should be not too short and not too long: if it's too short then > scaling couldn't have kicked in much at all; if it's too long then dynamic > autoscaling would have been sufficient. > - The job should use, at peak, at least a handful of workers (otherwise > means it wasn't used in settings where much scaling happened) > After a couple more rounds of narrowing-down, with some hand-checking that > the results and criteria so far make sense, I ended up with nothing - no > jobs that would have suffered a serious performance regression if their > BoundedSource had not supported initial size estimation [of course, except > for the liquid-shardable ones]. > > Based on this, I would like to propose to convert the following > BoundedSource-based IOs to ParDo-based, and while we're at it, probably > also add readAll() versions (not necessarily in exactly the same PR): > - ElasticsearchIO > - SolrIO > - MongoDbIO > - MongoDbGridFSIO > - CassandraIO > - HCatalogIO > - HadoopInputFormatIO > - UnboundedToBoundedSourceAdapter (already have a PR in progress for this > one) > These would not translate to a single ParDo - rather, they'd translate to > ParDo(estimate size and split according to the formula), Reshuffle, > ParDo(read data) - or possibly to a bounded SDF doing roughly the same > (luckily after https://github.com/apache/beam/pull/5940 all runners at > master will support bounded SDF so this is safe compatibility-wise). Pretty > much like DatastoreIO does. > > I would like to also propose to change the IO authoring guide > https://beam.apache.org/documentation/io/authoring-overview/#when-to-implement-using-the-source-api > to > basically say "Never implement a new BoundedSource unless you can support > liquid sharding". And add a utility for computing a desired number of > splits. > > There might be some more details here to iron out, but I wanted to check > with the community that this overall makes sense. > > Thanks. > >