Re: Let's start getting rid of BoundedSource

2018-07-18 Thread Etienne Chauchot
Le mardi 17 juillet 2018 à 09:48 -0700, Eugene Kirpichov a écrit :
> On Tue, Jul 17, 2018 at 2:49 AM Etienne Chauchot  wrote:
> > Hi Eugene
> > Le lundi 16 juillet 2018 à 07:52 -0700, Eugene Kirpichov a écrit :
> > > Hi Etienne - thanks for catching this; indeed, I somehow missed that 
> > > actually several runners do this same thing -
> > > it seemed to me as something that can be done in user code (because it 
> > > involves combining estimated size + split
> > > in pretty much the same way), 
> > 
> > When you say "user code", you mean IO writter code by opposition to runner 
> > code right ?
> Correct: "user code" is what happens in the SDK or the user pipeline.
>  
> >  
> > 
> > 
> > > but I'm not so sure: even though many runners have a "desired 
> > > parallelism" option or alike, it's not all of them,
> > > so we can't use such an option universally.
> > 
> > Agree, cannot be universal
> > > Maybe then the right thing to do is to:
> > > - Use bounded SDFs for these
> > > - Change SDF @SplitRestriction API to take a desired number of splits as 
> > > a parameter, and introduce an API
> > > @EstimateOutputSizeBytes(element) valid only on bounded SDFs
> > Agree with the idea but EstimateOutpuSize must return the size of the 
> > dataset not of an element.
> Please recall that the element here is e.g. a filename, or name of a BigTable 
> table, or something like that - i.e. the
> element describes the dataset, and the restriction describes what part of the 
> dataset
> 
> If e.g. we have a PCollection of filenames and apply a ReadTextFn SDF 
> to it, and want the runner to know the
> total size of all files - the runner could insert some transforms to apply 
> EstimateOutputSize to each element and
> Sum.globally() them.

You're right, I missunderstood what you meant by element. The important is that 
the runner could at some point before
calling @SplitRestriction know the size of the dataset, potentially with the 
Sum you mentioned.

>  
> >  On some runners, each worker is set to a given amount of heap.  Thus, it 
> > is important that a runner could evaluate
> > the size of the whole dataset to determine the size of each split (to fit 
> > in memory of the workers) and thus tell
> > the bounded SDF the number of desired splits.
> > > - Add some plumbing to the standard bounded SDF expansion so that 
> > > different runners can compute that parameter
> > > differently, the two standard ways being "split into given number of 
> > > splits" or "split based on the sub-linear
> > > formula of estimated size".
> > > 
> > > I think this would work, though this is somewhat more work than I 
> > > anticipated. Any alternative ideas?
> > +1 It will be very similar for an IO developer (@EstimateOutputSizeBytes 
> > will be similar to
> > source.getEstimatedSizeBytes(), and @SplitRestriction(desiredSplits) 
> > similar to source.split(desiredBundleSize))
> Yeah I'm not sure this is actually a good thing that these APIs end up so 
> similar to the old ones - I was hoping we
> could come up with something better - but seems like there's no viable 
> alternative at this point :) 
> > Etienne
> > > On Mon, Jul 16, 2018 at 3:07 AM Etienne Chauchot  
> > > wrote:
> > > > Hi, 
> > > > thanks Eugene for analyzing and sharing that.
> > > > I have one comment inline
> > > > 
> > > > Etienne
> > > > 
> > > > Le dimanche 15 juillet 2018 à 14:20 -0700, Eugene Kirpichov a écrit :
> > > > > Hey beamers,
> > > > > I've always wondered whether the BoundedSource implementations in the 
> > > > > Beam SDK are worth their complexity, or
> > > > > whether they rather could be converted to the much easier to code 
> > > > > ParDo style, which is also more modular and
> > > > > allows you to very easily implement readAll().
> > > > > 
> > > > > There's a handful: file-based sources, BigQuery, Bigtable, HBase, 
> > > > > Elasticsearch, MongoDB, Solr and a couple
> > > > > more.
> > > > > 
> > > > > Curiously enough, BoundedSource vs. ParDo matters *only* on Dataflow, 
> > > > > because AFAICT Dataflow is the only
> > > > > runner that cares about the things that BoundedSource can do and 
> > > > > ParDo can't:
> > > > > - size estimation (used to choose an initial number of workers) [ok, 
> > > > > Flink calls the function to return
> > > > > statistics, but doesn't seem to do anything else with it]
> > > > 
> > > > => Spark uses size estimation to set desired bundle size with something 
> > > > like desiredBundleSize = estimatedSize /
> > > > nbOfWorkersConfigured (partitions)
> > > > See 
> > > > https://github.com/apache/beam/blob/a5634128d194161aebc8d03229fdaa1066cf7739/runners/spark/src/main/java/org
> > > > /apache/beam/runners/spark/io/SourceRDD.java#L101
> > > > 
> > > > 
> > > > > - splitting into bundles of given size (Dataflow chooses the number 
> > > > > of bundles to create based on a simple
> > > > > formula that's not entirely unlike K*sqrt(size))
> > > > > - liquid sharding (splitAtFraction())
> > > > > 
> > > > > 

Re: Let's start getting rid of BoundedSource

2018-07-17 Thread Robert Bradshaw
On Sun, Jul 15, 2018 at 2:20 PM Eugene Kirpichov 
wrote:

> Hey beamers,
>
> I've always wondered whether the BoundedSource implementations in the Beam
> SDK are worth their complexity, or whether they rather could be converted
> to the much easier to code ParDo style, which is also more modular and
> allows you to very easily implement readAll().
>
> There's a handful: file-based sources, BigQuery, Bigtable, HBase,
> Elasticsearch, MongoDB, Solr and a couple more.
>
> Curiously enough, BoundedSource vs. ParDo matters *only* on Dataflow,
> because AFAICT Dataflow is the only runner that cares about the things that
> BoundedSource can do and ParDo can't:
> - size estimation (used to choose an initial number of workers) [ok, Flink
> calls the function to return statistics, but doesn't seem to do anything
> else with it]
> - splitting into bundles of given size (Dataflow chooses the number of
> bundles to create based on a simple formula that's not entirely unlike
> K*sqrt(size))
> - liquid sharding (splitAtFraction())
>
> If Dataflow didn't exist, there'd be no reason at all to use
> BoundedSource. So the question "which ones can be converted to ParDo" is
> really "which ones are used on Dataflow in ways that make these functions
> matter". Previously, my conservative assumption was that the answer is "all
> of them", but turns out this is not so.
>
> Liquid sharding always matters; if the source is liquid-shardable, for now
> we have to keep it a source (until SDF gains liquid sharding - which should
> happen in a quarter or two I think).
>
> Choosing number of bundles to split into is easily done in SDK code, see
> https://github.com/apache/beam/pull/5886 for example; DatastoreIO does
> something similar.
>
> The remaining thing to analyze is, when does initial scaling matter. So as
> a member of the Dataflow team, I analyzed statistics of production Dataflow
> jobs in the past month. I can not share my queries nor the data, because
> they are proprietary to Google - so I am sharing just the general
> methodology and conclusions, because they matter to the Beam community. I
> looked at a few criteria, such as:
> - The job should be not too short and not too long: if it's too short then
> scaling couldn't have kicked in much at all; if it's too long then dynamic
> autoscaling would have been sufficient.
> - The job should use, at peak, at least a handful of workers (otherwise
> means it wasn't used in settings where much scaling happened)
> After a couple more rounds of narrowing-down, with some hand-checking that
> the results and criteria so far make sense, I ended up with nothing - no
> jobs that would have suffered a serious performance regression if their
> BoundedSource had not supported initial size estimation [of course, except
> for the liquid-shardable ones].
>
> Based on this, I would like to propose to convert the following
> BoundedSource-based IOs to ParDo-based, and while we're at it, probably
> also add readAll() versions (not necessarily in exactly the same PR):
> - ElasticsearchIO
> - SolrIO
> - MongoDbIO
> - MongoDbGridFSIO
> - CassandraIO
> - HCatalogIO
> - HadoopInputFormatIO
> - UnboundedToBoundedSourceAdapter (already have a PR in progress for this
> one)
> These would not translate to a single ParDo - rather, they'd translate to
> ParDo(estimate size and split according to the formula), Reshuffle,
> ParDo(read data) - or possibly to a bounded SDF doing roughly the same
> (luckily after https://github.com/apache/beam/pull/5940 all runners at
> master will support bounded SDF so this is safe compatibility-wise). Pretty
> much like DatastoreIO does.
>

I think there is some value in having a single API that one implements,
rather than having every IO manually implement the ParDo + Reshuffle +
ParDo pattern. Until these convert over to SDFs, I'm not sure there's a net
win to manually converting them to ParDos rather than automatically
executing BoundedSources as ParDos. (There's also the sizing hooks that
have been pointed out, and even for those that don't yet support liquid
sharding, it'd be nice if, when someone wants to add liquid sharding to an
IO, they don't have to go in and massively restructure things.)

But perhaps I'm underestimating the complexity of using the BoundedSource
API vs. manually writing a sequence of ParDos. Or is it that existing
BoundedSources
don't lend themselves well to readAll? (Would a readAll extends
PTransform, T> be that hard? Specific sources could
implement a nicer ReadAll that is built on this.)

I would like to also propose to change the IO authoring guide
> https://beam.apache.org/documentation/io/authoring-overview/#when-to-implement-using-the-source-api
>  to
> basically say "Never implement a new BoundedSource unless you can support
> liquid sharding". And add a utility for computing a desired number of
> splits.
>
> There might be some more details here to iron out, but I wanted to check
> with the community that this overall makes sense.
>
> 

Re: Let's start getting rid of BoundedSource

2018-07-17 Thread Eugene Kirpichov
On Tue, Jul 17, 2018 at 2:49 AM Etienne Chauchot 
wrote:

> Hi Eugene
>
> Le lundi 16 juillet 2018 à 07:52 -0700, Eugene Kirpichov a écrit :
>
> Hi Etienne - thanks for catching this; indeed, I somehow missed that
> actually several runners do this same thing - it seemed to me as something
> that can be done in user code (because it involves combining estimated size
> + split in pretty much the same way),
>
>
> When you say "user code", you mean IO writter code by opposition to runner
> code right ?
>
Correct: "user code" is what happens in the SDK or the user pipeline.


>
>
> but I'm not so sure: even though many runners have a "desired parallelism"
> option or alike, it's not all of them, so we can't use such an option
> universally.
>
>
> Agree, cannot be universal
>
>
> Maybe then the right thing to do is to:
> - Use bounded SDFs for these
> - Change SDF @SplitRestriction API to take a desired number of splits as a
> parameter, and introduce an API @EstimateOutputSizeBytes(element) valid
> only on bounded SDFs
>
> Agree with the idea but EstimateOutpuSize must return the size of the
> dataset not of an element.
>
Please recall that the element here is e.g. a filename, or name of a
BigTable table, or something like that - i.e. the element describes the
dataset, and the restriction describes what part of the dataset.

If e.g. we have a PCollection of filenames and apply a ReadTextFn
SDF to it, and want the runner to know the total size of all files - the
runner could insert some transforms to apply EstimateOutputSize to each
element and Sum.globally() them.


> On some runners, each worker is set to a given amount of heap. Thus, it is
> important that a runner could evaluate the size of the whole dataset to
> determine the size of each split (to fit in memory of the workers) and thus
> tell the bounded SDF the number of desired splits.
>
> - Add some plumbing to the standard bounded SDF expansion so that
> different runners can compute that parameter differently, the two standard
> ways being "split into given number of splits" or "split based on the
> sub-linear formula of estimated size".
>
> I think this would work, though this is somewhat more work than I
> anticipated. Any alternative ideas?
>
> +1 It will be very similar for an IO developer (@EstimateOutputSizeBytes
> will be similar to source.getEstimatedSizeBytes(),
> and @SplitRestriction(desiredSplits) similar to
> source.split(desiredBundleSize))
>
Yeah I'm not sure this is actually a good thing that these APIs end up so
similar to the old ones - I was hoping we could come up with something
better - but seems like there's no viable alternative at this point :)


>
> Etienne
>
>
> On Mon, Jul 16, 2018 at 3:07 AM Etienne Chauchot 
> wrote:
>
> Hi,
> thanks Eugene for analyzing and sharing that.
> I have one comment inline
>
> Etienne
>
> Le dimanche 15 juillet 2018 à 14:20 -0700, Eugene Kirpichov a écrit :
>
> Hey beamers,
>
> I've always wondered whether the BoundedSource implementations in the Beam
> SDK are worth their complexity, or whether they rather could be converted
> to the much easier to code ParDo style, which is also more modular and
> allows you to very easily implement readAll().
>
> There's a handful: file-based sources, BigQuery, Bigtable, HBase,
> Elasticsearch, MongoDB, Solr and a couple more.
>
> Curiously enough, BoundedSource vs. ParDo matters *only* on Dataflow,
> because AFAICT Dataflow is the only runner that cares about the things that
> BoundedSource can do and ParDo can't:
> - size estimation (used to choose an initial number of workers) [ok, Flink
> calls the function to return statistics, but doesn't seem to do anything
> else with it]
>
> => Spark uses size estimation to set desired bundle size with something
> like desiredBundleSize = estimatedSize / nbOfWorkersConfigured (partitions)
> See
> https://github.com/apache/beam/blob/a5634128d194161aebc8d03229fdaa1066cf7739/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java#L101
>
>
> - splitting into bundles of given size (Dataflow chooses the number of
> bundles to create based on a simple formula that's not entirely unlike
> K*sqrt(size))
> - liquid sharding (splitAtFraction())
>
> If Dataflow didn't exist, there'd be no reason at all to use
> BoundedSource. So the question "which ones can be converted to ParDo" is
> really "which ones are used on Dataflow in ways that make these functions
> matter". Previously, my conservative assumption was that the answer is "all
> of them", but turns out this is not so.
>
> Liquid sharding always matters; if the source is liquid-shardable, for now
> we have to keep it a source (until SDF gains liquid sharding - which should
> happen in a quarter or two I think).
>
> Choosing number of bundles to split into is easily done in SDK code, see
> https://github.com/apache/beam/pull/5886 for example; DatastoreIO does
> something similar.
>
> The remaining thing to analyze is, when does initial scaling matter. 

Re: Let's start getting rid of BoundedSource

2018-07-17 Thread Etienne Chauchot
Hi Eugene
Le lundi 16 juillet 2018 à 07:52 -0700, Eugene Kirpichov a écrit :
> Hi Etienne - thanks for catching this; indeed, I somehow missed that actually 
> several runners do this same thing - it
> seemed to me as something that can be done in user code (because it involves 
> combining estimated size + split in
> pretty much the same way), 

When you say "user code", you mean IO writter code by opposition to runner code 
right ? 


> but I'm not so sure: even though many runners have a "desired parallelism" 
> option or alike, it's not all of them, so
> we can't use such an option universally.

Agree, cannot be universal
> Maybe then the right thing to do is to:
> - Use bounded SDFs for these
> - Change SDF @SplitRestriction API to take a desired number of splits as a 
> parameter, and introduce an API
> @EstimateOutputSizeBytes(element) valid only on bounded SDFs
Agree with the idea but EstimateOutpuSize must return the size of the dataset 
not of an element. On some runners, each
worker is set to a given amount of heap.  Thus, it is important that a runner 
could evaluate the size of the whole
dataset to determine the size of each split (to fit in memory of the workers) 
and thus tell the bounded SDF the number
of desired splits.
> - Add some plumbing to the standard bounded SDF expansion so that different 
> runners can compute that parameter
> differently, the two standard ways being "split into given number of splits" 
> or "split based on the sub-linear formula
> of estimated size".
> 
> I think this would work, though this is somewhat more work than I 
> anticipated. Any alternative ideas?
+1 It will be very similar for an IO developer (@EstimateOutputSizeBytes will 
be similar to
source.getEstimatedSizeBytes(), and @SplitRestriction(desiredSplits) similar to 
source.split(desiredBundleSize))
Etienne
> On Mon, Jul 16, 2018 at 3:07 AM Etienne Chauchot  wrote:
> > Hi, 
> > thanks Eugene for analyzing and sharing that.
> > I have one comment inline
> > 
> > Etienne
> > 
> > Le dimanche 15 juillet 2018 à 14:20 -0700, Eugene Kirpichov a écrit :
> > > Hey beamers,
> > > I've always wondered whether the BoundedSource implementations in the 
> > > Beam SDK are worth their complexity, or
> > > whether they rather could be converted to the much easier to code ParDo 
> > > style, which is also more modular and
> > > allows you to very easily implement readAll().
> > > 
> > > There's a handful: file-based sources, BigQuery, Bigtable, HBase, 
> > > Elasticsearch, MongoDB, Solr and a couple more.
> > > 
> > > Curiously enough, BoundedSource vs. ParDo matters *only* on Dataflow, 
> > > because AFAICT Dataflow is the only runner
> > > that cares about the things that BoundedSource can do and ParDo can't:
> > > - size estimation (used to choose an initial number of workers) [ok, 
> > > Flink calls the function to return
> > > statistics, but doesn't seem to do anything else with it]
> > 
> > => Spark uses size estimation to set desired bundle size with something 
> > like desiredBundleSize = estimatedSize /
> > nbOfWorkersConfigured (partitions)
> > See 
> > https://github.com/apache/beam/blob/a5634128d194161aebc8d03229fdaa1066cf7739/runners/spark/src/main/java/org/apa
> > che/beam/runners/spark/io/SourceRDD.java#L101
> > 
> > 
> > > - splitting into bundles of given size (Dataflow chooses the number of 
> > > bundles to create based on a simple formula
> > > that's not entirely unlike K*sqrt(size))
> > > - liquid sharding (splitAtFraction())
> > > 
> > > If Dataflow didn't exist, there'd be no reason at all to use 
> > > BoundedSource. So the question "which ones can be
> > > converted to ParDo" is really "which ones are used on Dataflow in ways 
> > > that make these functions matter".
> > > Previously, my conservative assumption was that the answer is "all of 
> > > them", but turns out this is not so.
> > > 
> > > Liquid sharding always matters; if the source is liquid-shardable, for 
> > > now we have to keep it a source (until SDF
> > > gains liquid sharding - which should happen in a quarter or two I think).
> > > 
> > > Choosing number of bundles to split into is easily done in SDK code, see 
> > > https://github.com/apache/beam/pull/5886 
> > > for example; DatastoreIO does something similar.
> > > 
> > > The remaining thing to analyze is, when does initial scaling matter. So 
> > > as a member of the Dataflow team, I
> > > analyzed statistics of production Dataflow jobs in the past month. I can 
> > > not share my queries nor the data,
> > > because they are proprietary to Google - so I am sharing just the general 
> > > methodology and conclusions, because
> > > they matter to the Beam community. I looked at a few criteria, such as:
> > > - The job should be not too short and not too long: if it's too short 
> > > then scaling couldn't have kicked in much at
> > > all; if it's too long then dynamic autoscaling would have been sufficient.
> > > - The job should use, at peak, at least a handful of 

Re: Let's start getting rid of BoundedSource

2018-07-16 Thread Eugene Kirpichov
Hey all,

The PR https://github.com/apache/beam/pull/5940 was merged, and now all
runners at "master" support bounded-per-element SDFs!
Thanks +Ismaël Mejía  for the reviews.
I have updated the Capability Matrix as well:
https://beam.apache.org/documentation/runners/capability-matrix/


On Mon, Jul 16, 2018 at 7:56 AM Jean-Baptiste Onofré 
wrote:

> Hi guys,
>
> I think it's the purpose of SDF to simplify the BoundedSource like writing.
>
> I agree that extended @SplitRestriction is a good approach.
>
> Regards
> JB
>
> On 16/07/2018 16:52, Eugene Kirpichov wrote:
> > Hi Etienne - thanks for catching this; indeed, I somehow missed that
> > actually several runners do this same thing - it seemed to me as
> > something that can be done in user code (because it involves combining
> > estimated size + split in pretty much the same way), but I'm not so
> > sure: even though many runners have a "desired parallelism" option or
> > alike, it's not all of them, so we can't use such an option universally.
> >
> > Maybe then the right thing to do is to:
> > - Use bounded SDFs for these
> > - Change SDF @SplitRestriction API to take a desired number of splits as
> > a parameter, and introduce an API @EstimateOutputSizeBytes(element)
> > valid only on bounded SDFs
> > - Add some plumbing to the standard bounded SDF expansion so that
> > different runners can compute that parameter differently, the two
> > standard ways being "split into given number of splits" or "split based
> > on the sub-linear formula of estimated size".
> >
> > I think this would work, though this is somewhat more work than I
> > anticipated. Any alternative ideas?
> >
> > On Mon, Jul 16, 2018 at 3:07 AM Etienne Chauchot  > > wrote:
> >
> > Hi,
> > thanks Eugene for analyzing and sharing that.
> > I have one comment inline
> >
> > Etienne
> >
> > Le dimanche 15 juillet 2018 à 14:20 -0700, Eugene Kirpichov a écrit :
> >> Hey beamers,
> >>
> >> I've always wondered whether the BoundedSource implementations in
> >> the Beam SDK are worth their complexity, or whether they rather
> >> could be converted to the much easier to code ParDo style, which
> >> is also more modular and allows you to very easily implement
> >> readAll().
> >>
> >> There's a handful: file-based sources, BigQuery, Bigtable, HBase,
> >> Elasticsearch, MongoDB, Solr and a couple more.
> >>
> >> Curiously enough, BoundedSource vs. ParDo matters *only* on
> >> Dataflow, because AFAICT Dataflow is the only runner that cares
> >> about the things that BoundedSource can do and ParDo can't:
> >> - size estimation (used to choose an initial number of workers)
> >> [ok, Flink calls the function to return statistics, but doesn't
> >> seem to do anything else with it]
> > => Spark uses size estimation to set desired bundle size with
> > something like desiredBundleSize = estimatedSize /
> > nbOfWorkersConfigured (partitions)
> > See
> >
> https://github.com/apache/beam/blob/a5634128d194161aebc8d03229fdaa1066cf7739/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java#L101
> >
> >
> >> - splitting into bundles of given size (Dataflow chooses the
> >> number of bundles to create based on a simple formula that's not
> >> entirely unlike K*sqrt(size))
> >> - liquid sharding (splitAtFraction())
> >>
> >> If Dataflow didn't exist, there'd be no reason at all to use
> >> BoundedSource. So the question "which ones can be converted to
> >> ParDo" is really "which ones are used on Dataflow in ways that
> >> make these functions matter". Previously, my conservative
> >> assumption was that the answer is "all of them", but turns out
> >> this is not so.
> >>
> >> Liquid sharding always matters; if the source is liquid-shardable,
> >> for now we have to keep it a source (until SDF gains liquid
> >> sharding - which should happen in a quarter or two I think).
> >>
> >> Choosing number of bundles to split into is easily done in SDK
> >> code, see https://github.com/apache/beam/pull/5886 for example;
> >> DatastoreIO does something similar.
> >>
> >> The remaining thing to analyze is, when does initial scaling
> >> matter. So as a member of the Dataflow team, I analyzed statistics
> >> of production Dataflow jobs in the past month. I can not share my
> >> queries nor the data, because they are proprietary to Google - so
> >> I am sharing just the general methodology and conclusions, because
> >> they matter to the Beam community. I looked at a few criteria,
> >> such as:
> >> - The job should be not too short and not too long: if it's too
> >> short then scaling couldn't have kicked in much at all; if it's
> >> too long then dynamic autoscaling would have been sufficient.
> >> - The job should use, at peak, at least a handful of workers
> >> (otherwise 

Re: Let's start getting rid of BoundedSource

2018-07-16 Thread Jean-Baptiste Onofré
Hi guys,

I think it's the purpose of SDF to simplify the BoundedSource like writing.

I agree that extended @SplitRestriction is a good approach.

Regards
JB

On 16/07/2018 16:52, Eugene Kirpichov wrote:
> Hi Etienne - thanks for catching this; indeed, I somehow missed that
> actually several runners do this same thing - it seemed to me as
> something that can be done in user code (because it involves combining
> estimated size + split in pretty much the same way), but I'm not so
> sure: even though many runners have a "desired parallelism" option or
> alike, it's not all of them, so we can't use such an option universally.
> 
> Maybe then the right thing to do is to:
> - Use bounded SDFs for these
> - Change SDF @SplitRestriction API to take a desired number of splits as
> a parameter, and introduce an API @EstimateOutputSizeBytes(element)
> valid only on bounded SDFs
> - Add some plumbing to the standard bounded SDF expansion so that
> different runners can compute that parameter differently, the two
> standard ways being "split into given number of splits" or "split based
> on the sub-linear formula of estimated size".
> 
> I think this would work, though this is somewhat more work than I
> anticipated. Any alternative ideas?
> 
> On Mon, Jul 16, 2018 at 3:07 AM Etienne Chauchot  > wrote:
> 
> Hi,
> thanks Eugene for analyzing and sharing that.
> I have one comment inline
> 
> Etienne
> 
> Le dimanche 15 juillet 2018 à 14:20 -0700, Eugene Kirpichov a écrit :
>> Hey beamers,
>>
>> I've always wondered whether the BoundedSource implementations in
>> the Beam SDK are worth their complexity, or whether they rather
>> could be converted to the much easier to code ParDo style, which
>> is also more modular and allows you to very easily implement
>> readAll().
>>
>> There's a handful: file-based sources, BigQuery, Bigtable, HBase,
>> Elasticsearch, MongoDB, Solr and a couple more.
>>
>> Curiously enough, BoundedSource vs. ParDo matters *only* on
>> Dataflow, because AFAICT Dataflow is the only runner that cares
>> about the things that BoundedSource can do and ParDo can't:
>> - size estimation (used to choose an initial number of workers)
>> [ok, Flink calls the function to return statistics, but doesn't
>> seem to do anything else with it]
> => Spark uses size estimation to set desired bundle size with
> something like desiredBundleSize = estimatedSize /
> nbOfWorkersConfigured (partitions)
> See
> 
> https://github.com/apache/beam/blob/a5634128d194161aebc8d03229fdaa1066cf7739/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java#L101
> 
> 
>> - splitting into bundles of given size (Dataflow chooses the
>> number of bundles to create based on a simple formula that's not
>> entirely unlike K*sqrt(size))
>> - liquid sharding (splitAtFraction())
>>
>> If Dataflow didn't exist, there'd be no reason at all to use
>> BoundedSource. So the question "which ones can be converted to
>> ParDo" is really "which ones are used on Dataflow in ways that
>> make these functions matter". Previously, my conservative
>> assumption was that the answer is "all of them", but turns out
>> this is not so.
>>
>> Liquid sharding always matters; if the source is liquid-shardable,
>> for now we have to keep it a source (until SDF gains liquid
>> sharding - which should happen in a quarter or two I think).
>>
>> Choosing number of bundles to split into is easily done in SDK
>> code, see https://github.com/apache/beam/pull/5886 for example;
>> DatastoreIO does something similar.
>>
>> The remaining thing to analyze is, when does initial scaling
>> matter. So as a member of the Dataflow team, I analyzed statistics
>> of production Dataflow jobs in the past month. I can not share my
>> queries nor the data, because they are proprietary to Google - so
>> I am sharing just the general methodology and conclusions, because
>> they matter to the Beam community. I looked at a few criteria,
>> such as:
>> - The job should be not too short and not too long: if it's too
>> short then scaling couldn't have kicked in much at all; if it's
>> too long then dynamic autoscaling would have been sufficient.
>> - The job should use, at peak, at least a handful of workers
>> (otherwise means it wasn't used in settings where much scaling
>> happened)
>> After a couple more rounds of narrowing-down, with some
>> hand-checking that the results and criteria so far make sense, I
>> ended up with nothing - no jobs that would have suffered a serious
>> performance regression if their BoundedSource had not supported
>> initial size estimation [of course, except for the
>> liquid-shardable ones].
>>
>> Based on this, I would like to propose to convert the following
>> 

Re: Let's start getting rid of BoundedSource

2018-07-16 Thread Eugene Kirpichov
Hi Etienne - thanks for catching this; indeed, I somehow missed that
actually several runners do this same thing - it seemed to me as something
that can be done in user code (because it involves combining estimated size
+ split in pretty much the same way), but I'm not so sure: even though many
runners have a "desired parallelism" option or alike, it's not all of them,
so we can't use such an option universally.

Maybe then the right thing to do is to:
- Use bounded SDFs for these
- Change SDF @SplitRestriction API to take a desired number of splits as a
parameter, and introduce an API @EstimateOutputSizeBytes(element) valid
only on bounded SDFs
- Add some plumbing to the standard bounded SDF expansion so that different
runners can compute that parameter differently, the two standard ways being
"split into given number of splits" or "split based on the sub-linear
formula of estimated size".

I think this would work, though this is somewhat more work than I
anticipated. Any alternative ideas?

On Mon, Jul 16, 2018 at 3:07 AM Etienne Chauchot 
wrote:

> Hi,
> thanks Eugene for analyzing and sharing that.
> I have one comment inline
>
> Etienne
>
> Le dimanche 15 juillet 2018 à 14:20 -0700, Eugene Kirpichov a écrit :
>
> Hey beamers,
>
> I've always wondered whether the BoundedSource implementations in the Beam
> SDK are worth their complexity, or whether they rather could be converted
> to the much easier to code ParDo style, which is also more modular and
> allows you to very easily implement readAll().
>
> There's a handful: file-based sources, BigQuery, Bigtable, HBase,
> Elasticsearch, MongoDB, Solr and a couple more.
>
> Curiously enough, BoundedSource vs. ParDo matters *only* on Dataflow,
> because AFAICT Dataflow is the only runner that cares about the things that
> BoundedSource can do and ParDo can't:
> - size estimation (used to choose an initial number of workers) [ok, Flink
> calls the function to return statistics, but doesn't seem to do anything
> else with it]
>
> => Spark uses size estimation to set desired bundle size with something
> like desiredBundleSize = estimatedSize / nbOfWorkersConfigured (partitions)
> See
> https://github.com/apache/beam/blob/a5634128d194161aebc8d03229fdaa1066cf7739/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java#L101
>
>
> - splitting into bundles of given size (Dataflow chooses the number of
> bundles to create based on a simple formula that's not entirely unlike
> K*sqrt(size))
> - liquid sharding (splitAtFraction())
>
> If Dataflow didn't exist, there'd be no reason at all to use
> BoundedSource. So the question "which ones can be converted to ParDo" is
> really "which ones are used on Dataflow in ways that make these functions
> matter". Previously, my conservative assumption was that the answer is "all
> of them", but turns out this is not so.
>
> Liquid sharding always matters; if the source is liquid-shardable, for now
> we have to keep it a source (until SDF gains liquid sharding - which should
> happen in a quarter or two I think).
>
> Choosing number of bundles to split into is easily done in SDK code, see
> https://github.com/apache/beam/pull/5886 for example; DatastoreIO does
> something similar.
>
> The remaining thing to analyze is, when does initial scaling matter. So as
> a member of the Dataflow team, I analyzed statistics of production Dataflow
> jobs in the past month. I can not share my queries nor the data, because
> they are proprietary to Google - so I am sharing just the general
> methodology and conclusions, because they matter to the Beam community. I
> looked at a few criteria, such as:
> - The job should be not too short and not too long: if it's too short then
> scaling couldn't have kicked in much at all; if it's too long then dynamic
> autoscaling would have been sufficient.
> - The job should use, at peak, at least a handful of workers (otherwise
> means it wasn't used in settings where much scaling happened)
> After a couple more rounds of narrowing-down, with some hand-checking that
> the results and criteria so far make sense, I ended up with nothing - no
> jobs that would have suffered a serious performance regression if their
> BoundedSource had not supported initial size estimation [of course, except
> for the liquid-shardable ones].
>
> Based on this, I would like to propose to convert the following
> BoundedSource-based IOs to ParDo-based, and while we're at it, probably
> also add readAll() versions (not necessarily in exactly the same PR):
> - ElasticsearchIO
> - SolrIO
> - MongoDbIO
> - MongoDbGridFSIO
> - CassandraIO
> - HCatalogIO
> - HadoopInputFormatIO
> - UnboundedToBoundedSourceAdapter (already have a PR in progress for this
> one)
> These would not translate to a single ParDo - rather, they'd translate to
> ParDo(estimate size and split according to the formula), Reshuffle,
> ParDo(read data) - or possibly to a bounded SDF doing roughly the same
> (luckily after 

Re: Let's start getting rid of BoundedSource

2018-07-16 Thread Etienne Chauchot
Hi, thanks Eugene for analyzing and sharing that.I have one comment inline
Etienne
Le dimanche 15 juillet 2018 à 14:20 -0700, Eugene Kirpichov a écrit :
> Hey beamers,
> I've always wondered whether the BoundedSource implementations in the Beam 
> SDK are worth their complexity, or whether
> they rather could be converted to the much easier to code ParDo style, which 
> is also more modular and allows you to
> very easily implement readAll().
> 
> There's a handful: file-based sources, BigQuery, Bigtable, HBase, 
> Elasticsearch, MongoDB, Solr and a couple more.
> 
> Curiously enough, BoundedSource vs. ParDo matters *only* on Dataflow, because 
> AFAICT Dataflow is the only runner that
> cares about the things that BoundedSource can do and ParDo can't:
> - size estimation (used to choose an initial number of workers) [ok, Flink 
> calls the function to return statistics,
> but doesn't seem to do anything else with it]
=> Spark uses size estimation to set desired bundle size with something like 
desiredBundleSize = estimatedSize /
nbOfWorkersConfigured (partitions)See 
https://github.com/apache/beam/blob/a5634128d194161aebc8d03229fdaa1066cf7739/runne
rs/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java#L101

> - splitting into bundles of given size (Dataflow chooses the number of 
> bundles to create based on a simple formula
> that's not entirely unlike K*sqrt(size))
> - liquid sharding (splitAtFraction())
> 
> If Dataflow didn't exist, there'd be no reason at all to use BoundedSource. 
> So the question "which ones can be
> converted to ParDo" is really "which ones are used on Dataflow in ways that 
> make these functions matter". Previously,
> my conservative assumption was that the answer is "all of them", but turns 
> out this is not so.
> 
> Liquid sharding always matters; if the source is liquid-shardable, for now we 
> have to keep it a source (until SDF
> gains liquid sharding - which should happen in a quarter or two I think).
> 
> Choosing number of bundles to split into is easily done in SDK code, see 
> https://github.com/apache/beam/pull/5886 for
> example; DatastoreIO does something similar.
> 
> The remaining thing to analyze is, when does initial scaling matter. So as a 
> member of the Dataflow team, I analyzed
> statistics of production Dataflow jobs in the past month. I can not share my 
> queries nor the data, because they are
> proprietary to Google - so I am sharing just the general methodology and 
> conclusions, because they matter to the Beam
> community. I looked at a few criteria, such as:
> - The job should be not too short and not too long: if it's too short then 
> scaling couldn't have kicked in much at
> all; if it's too long then dynamic autoscaling would have been sufficient.
> - The job should use, at peak, at least a handful of workers (otherwise means 
> it wasn't used in settings where much
> scaling happened)
> After a couple more rounds of narrowing-down, with some hand-checking that 
> the results and criteria so far make sense,
> I ended up with nothing - no jobs that would have suffered a serious 
> performance regression if their BoundedSource had
> not supported initial size estimation [of course, except for the 
> liquid-shardable ones].
> 
> Based on this, I would like to propose to convert the following 
> BoundedSource-based IOs to ParDo-based, and while
> we're at it, probably also add readAll() versions (not necessarily in exactly 
> the same PR):
> - ElasticsearchIO
> - SolrIO
> - MongoDbIO
> - MongoDbGridFSIO
> - CassandraIO
> - HCatalogIO
> - HadoopInputFormatIO
> - UnboundedToBoundedSourceAdapter (already have a PR in progress for this one)
> These would not translate to a single ParDo - rather, they'd translate to 
> ParDo(estimate size and split according to
> the formula), Reshuffle, ParDo(read data) - or possibly to a bounded SDF 
> doing roughly the same (luckily after https:/
> /github.com/apache/beam/pull/5940 all runners at master will support bounded 
> SDF so this is safe compatibility-wise).
> Pretty much like DatastoreIO does.
> 
> I would like to also propose to change the IO authoring guide 
> https://beam.apache.org/documentation/io/authoring-overv
> iew/#when-to-implement-using-the-source-api to basically say "Never implement 
> a new BoundedSource unless you can
> support liquid sharding". And add a utility for computing a desired number of 
> splits.
> 
> There might be some more details here to iron out, but I wanted to check with 
> the community that this overall makes
> sense.
> 
> Thanks.

Let's start getting rid of BoundedSource

2018-07-15 Thread Eugene Kirpichov
Hey beamers,

I've always wondered whether the BoundedSource implementations in the Beam
SDK are worth their complexity, or whether they rather could be converted
to the much easier to code ParDo style, which is also more modular and
allows you to very easily implement readAll().

There's a handful: file-based sources, BigQuery, Bigtable, HBase,
Elasticsearch, MongoDB, Solr and a couple more.

Curiously enough, BoundedSource vs. ParDo matters *only* on Dataflow,
because AFAICT Dataflow is the only runner that cares about the things that
BoundedSource can do and ParDo can't:
- size estimation (used to choose an initial number of workers) [ok, Flink
calls the function to return statistics, but doesn't seem to do anything
else with it]
- splitting into bundles of given size (Dataflow chooses the number of
bundles to create based on a simple formula that's not entirely unlike
K*sqrt(size))
- liquid sharding (splitAtFraction())

If Dataflow didn't exist, there'd be no reason at all to use BoundedSource.
So the question "which ones can be converted to ParDo" is really "which
ones are used on Dataflow in ways that make these functions matter".
Previously, my conservative assumption was that the answer is "all of
them", but turns out this is not so.

Liquid sharding always matters; if the source is liquid-shardable, for now
we have to keep it a source (until SDF gains liquid sharding - which should
happen in a quarter or two I think).

Choosing number of bundles to split into is easily done in SDK code, see
https://github.com/apache/beam/pull/5886 for example; DatastoreIO does
something similar.

The remaining thing to analyze is, when does initial scaling matter. So as
a member of the Dataflow team, I analyzed statistics of production Dataflow
jobs in the past month. I can not share my queries nor the data, because
they are proprietary to Google - so I am sharing just the general
methodology and conclusions, because they matter to the Beam community. I
looked at a few criteria, such as:
- The job should be not too short and not too long: if it's too short then
scaling couldn't have kicked in much at all; if it's too long then dynamic
autoscaling would have been sufficient.
- The job should use, at peak, at least a handful of workers (otherwise
means it wasn't used in settings where much scaling happened)
After a couple more rounds of narrowing-down, with some hand-checking that
the results and criteria so far make sense, I ended up with nothing - no
jobs that would have suffered a serious performance regression if their
BoundedSource had not supported initial size estimation [of course, except
for the liquid-shardable ones].

Based on this, I would like to propose to convert the following
BoundedSource-based IOs to ParDo-based, and while we're at it, probably
also add readAll() versions (not necessarily in exactly the same PR):
- ElasticsearchIO
- SolrIO
- MongoDbIO
- MongoDbGridFSIO
- CassandraIO
- HCatalogIO
- HadoopInputFormatIO
- UnboundedToBoundedSourceAdapter (already have a PR in progress for this
one)
These would not translate to a single ParDo - rather, they'd translate to
ParDo(estimate size and split according to the formula), Reshuffle,
ParDo(read data) - or possibly to a bounded SDF doing roughly the same
(luckily after https://github.com/apache/beam/pull/5940 all runners at
master will support bounded SDF so this is safe compatibility-wise). Pretty
much like DatastoreIO does.

I would like to also propose to change the IO authoring guide
https://beam.apache.org/documentation/io/authoring-overview/#when-to-implement-using-the-source-api
to
basically say "Never implement a new BoundedSource unless you can support
liquid sharding". And add a utility for computing a desired number of
splits.

There might be some more details here to iron out, but I wanted to check
with the community that this overall makes sense.

Thanks.