Re: Add Sorting Class?

2016-05-27 Thread Robert Bradshaw
Totally agree that orderings of values within a
key[-window[-pane]]-grouping being quite useful, and they make total
sense in the model (primarily because elements themselves are never
partitioned).

On Fri, May 27, 2016 at 11:31 AM, Bobby Evans
 wrote:
> If you have local ordering within a group-by like what Francis said, you can 
> build global total ordering off of it, by partitioning the data in ranges.  
> Most terrasort implementations do this.  They subsample the data to get a 
> fairly evenly distributed range of keys, and then do individual smaller sorts 
> that are written out to ordered files.
>
> Having ordering within a window/key seems to make since to me from a 
> streaming perspective too.  So for batch where the default window is 
> everything, you would get ordering for all of the data with the same 
> key/partition.  For streaming you could get ordering within a given window 
> for a key/partition.  I can see a number of uses for this, and combining 
> different panes/windows together and still preserving ordering is not that 
> hard using a streaming merge sort.
>  Yes there is a metadata problem about which transforms preserve ordering of 
> a PCollection and which ones do not, but for most things except sinks that 
> can come as an optimization that transforms can adapt to over time.  If we 
> assume the default is that they do not preserve order (which is what pig 
> does) then you would write your DAG with an order by transform in front of 
> the sink.  The only issue here would be for sinks that might not preserve 
> ordering (like with bigquery that does a dedupe prior to writing out data).
>
> - Bobby
>
> On Thursday, May 26, 2016 3:40 PM, Jesse Anderson  
> wrote:
>
>
>  Another perspective is to look at other projects in the Hadoop ecosystem.
>
> Impala had to have a LIMIT any time you did an ORDER BY. They're since
> removed this limitation.
>
> Hive has two sorting options. ORDER BY does a global order. SORT BY orders
> everything in that partition.
>
> On Thu, May 26, 2016 at 12:35 PM Jesse Anderson 
> wrote:
>
>> I had a similar thought, but wasn't sure if that violated a tenet of Beam.
>>
>> I'm thinking an ordered sink could wrap around another sink. I could see
>> something like:
>> collection.apply(OrderedSink.Timestamp.write(TextIO.Write.To(...)));
>>
>> On Thu, May 26, 2016 at 12:26 PM Robert Bradshaw
>>  wrote:
>>
>>> As Frances alluded to, it's also really hard to reconcile the notion
>>> of a globally ordered PCollection in the context of a streaming
>>> pipeline. Sorting also imposes conditions on partitioning, which we
>>> intentionally leave unspecified for maximum flexibility in the
>>> runtime. One also gets into the question of whether particular
>>> operations are order-creating, order-preserving, or order-destroying
>>> and how much extra overhead is required to maintain these properties
>>> for intermediate collections.
>>>
>>> Your mention of sorting by time is interesting, as this is the
>>> inherent sort dimension is streaming (and we use features like
>>> windowing and triggering to do correct time-based grouping despite
>>> real-time skew). Other than that, all the uses of sorting I've seen
>>> have been limited to portions of data small enough to be produced by
>>> (and consumed by) a single machine (so tops GBs, not TBs or PBs).
>>>
>>> All that aside, I could see more tractable case being made for
>>> ordering (partitioning, etc.) a particular materialization of a
>>> PCollection, i.e. being sorted would not be a property of a
>>> PCollection itself, but could be provided by a sink (e.g. one could
>>> have a sink that promises to write its records in a particular order
>>> within and across shards). It's not inconceivable that this could be
>>> done in a way that is composible with (a large class of) existing
>>> sinks, e.g. given a FileBasedSink and intra/inter-shard-sorting
>>> specifications, one could produce a bounded sink that writes "sorted"
>>> files. Lots of design work TBD...
>>>
>>> - Robert
>>>
>>>
>>>
>>>
>>> On Thu, May 26, 2016 at 11:32 AM, Jesse Anderson 
>>> wrote:
>>> > @frances great analysis. I'm hoping this serves as the starting point
>>> for
>>> > the discussion.
>>> >
>>> > It really comes down to: is this a nice to have or a show stopping
>>> > requirement? As you mention, it comes down to the use case. I've taught
>>> at
>>> > large financial companies where (global) sorting was a real and show
>>> > stopping use case. Theirs was for a large end of day report that had to
>>> be
>>> > globally sorted and consumed by many other groups. Sorry, I can't be
>>> more
>>> > specific.
>>> >
>>> > Thanks,
>>> >
>>> > Jesse
>>> >
>>> > On Thu, May 26, 2016 at 10:19 AM Frances Perry 
>>> > wrote:
>>> >
>>> >> Currently the Beam model doesn't provide the functionality to do
>>> 

Re: Add Sorting Class?

2016-05-26 Thread Jesse Anderson
Another perspective is to look at other projects in the Hadoop ecosystem.

Impala had to have a LIMIT any time you did an ORDER BY. They're since
removed this limitation.

Hive has two sorting options. ORDER BY does a global order. SORT BY orders
everything in that partition.

On Thu, May 26, 2016 at 12:35 PM Jesse Anderson 
wrote:

> I had a similar thought, but wasn't sure if that violated a tenet of Beam.
>
> I'm thinking an ordered sink could wrap around another sink. I could see
> something like:
> collection.apply(OrderedSink.Timestamp.write(TextIO.Write.To(...)));
>
> On Thu, May 26, 2016 at 12:26 PM Robert Bradshaw
>  wrote:
>
>> As Frances alluded to, it's also really hard to reconcile the notion
>> of a globally ordered PCollection in the context of a streaming
>> pipeline. Sorting also imposes conditions on partitioning, which we
>> intentionally leave unspecified for maximum flexibility in the
>> runtime. One also gets into the question of whether particular
>> operations are order-creating, order-preserving, or order-destroying
>> and how much extra overhead is required to maintain these properties
>> for intermediate collections.
>>
>> Your mention of sorting by time is interesting, as this is the
>> inherent sort dimension is streaming (and we use features like
>> windowing and triggering to do correct time-based grouping despite
>> real-time skew). Other than that, all the uses of sorting I've seen
>> have been limited to portions of data small enough to be produced by
>> (and consumed by) a single machine (so tops GBs, not TBs or PBs).
>>
>> All that aside, I could see more tractable case being made for
>> ordering (partitioning, etc.) a particular materialization of a
>> PCollection, i.e. being sorted would not be a property of a
>> PCollection itself, but could be provided by a sink (e.g. one could
>> have a sink that promises to write its records in a particular order
>> within and across shards). It's not inconceivable that this could be
>> done in a way that is composible with (a large class of) existing
>> sinks, e.g. given a FileBasedSink and intra/inter-shard-sorting
>> specifications, one could produce a bounded sink that writes "sorted"
>> files. Lots of design work TBD...
>>
>> - Robert
>>
>>
>>
>>
>> On Thu, May 26, 2016 at 11:32 AM, Jesse Anderson 
>> wrote:
>> > @frances great analysis. I'm hoping this serves as the starting point
>> for
>> > the discussion.
>> >
>> > It really comes down to: is this a nice to have or a show stopping
>> > requirement? As you mention, it comes down to the use case. I've taught
>> at
>> > large financial companies where (global) sorting was a real and show
>> > stopping use case. Theirs was for a large end of day report that had to
>> be
>> > globally sorted and consumed by many other groups. Sorry, I can't be
>> more
>> > specific.
>> >
>> > Thanks,
>> >
>> > Jesse
>> >
>> > On Thu, May 26, 2016 at 10:19 AM Frances Perry 
>> > wrote:
>> >
>> >> Currently the Beam model doesn't provide the functionality to do
>> sorting,
>> >> so this is a pretty deep feature request. Let's separate the discussion
>> >> into value sorting and global sorting.
>> >>
>> >> For value sorting, you need to be able to specify some property of the
>> >> value (often called a secondary key) and have the GroupByKey/shuffle
>> >> implementation sort values for a given key by the secondary key. This
>> is a
>> >> pretty common use case, and I think exposing this in Beam would make a
>> lot
>> >> of sense. The Hadoop and the Cloud Dataflow shuffle implementation
>> supports
>> >> this, for example. So it may just be a matter of figuring out how best
>> to
>> >> expose it to users. In FlumeJava we had you explicitly ParDo to pair
>> values
>> >> with a string "sort key" so you'd GroupByKey on a PCollection> >> KV> and get back the Values sorted lexicographically by
>> >> String. It's a bit gross for users to think about a way to order things
>> >> that sorts lexicographically. Looks like Crunch[1] uses a general sort
>> key
>> >> -- but that likely won't interact cleanly with Beam's use of encoded
>> keys
>> >> for comparisons. Would be nice to think about if there's a cleaner way.
>> >>
>> >> For global sorting, you need to be able to be able to generate and
>> maintain
>> >> orderedness across the elements in a PCollection and have a way to
>> know how
>> >> to partition the PCollection into balanced, sorted subchunks. This
>> would
>> >> have a pretty large impact on the Beam model and potentially on many
>> of the
>> >> runners. Looking at the Crunch sort [1], it requires users to provide
>> the
>> >> partitioning function if they want it to scale beyond a single reduce.
>> I'd
>> >> love to see if there's a way to do better. It also can have a pretty
>> big
>> >> impact on the ability to efficiently parallelize execution (things like
>> >> 

Re: Add Sorting Class?

2016-05-26 Thread Jesse Anderson
I had a similar thought, but wasn't sure if that violated a tenet of Beam.

I'm thinking an ordered sink could wrap around another sink. I could see
something like:
collection.apply(OrderedSink.Timestamp.write(TextIO.Write.To(...)));

On Thu, May 26, 2016 at 12:26 PM Robert Bradshaw
 wrote:

> As Frances alluded to, it's also really hard to reconcile the notion
> of a globally ordered PCollection in the context of a streaming
> pipeline. Sorting also imposes conditions on partitioning, which we
> intentionally leave unspecified for maximum flexibility in the
> runtime. One also gets into the question of whether particular
> operations are order-creating, order-preserving, or order-destroying
> and how much extra overhead is required to maintain these properties
> for intermediate collections.
>
> Your mention of sorting by time is interesting, as this is the
> inherent sort dimension is streaming (and we use features like
> windowing and triggering to do correct time-based grouping despite
> real-time skew). Other than that, all the uses of sorting I've seen
> have been limited to portions of data small enough to be produced by
> (and consumed by) a single machine (so tops GBs, not TBs or PBs).
>
> All that aside, I could see more tractable case being made for
> ordering (partitioning, etc.) a particular materialization of a
> PCollection, i.e. being sorted would not be a property of a
> PCollection itself, but could be provided by a sink (e.g. one could
> have a sink that promises to write its records in a particular order
> within and across shards). It's not inconceivable that this could be
> done in a way that is composible with (a large class of) existing
> sinks, e.g. given a FileBasedSink and intra/inter-shard-sorting
> specifications, one could produce a bounded sink that writes "sorted"
> files. Lots of design work TBD...
>
> - Robert
>
>
>
>
> On Thu, May 26, 2016 at 11:32 AM, Jesse Anderson 
> wrote:
> > @frances great analysis. I'm hoping this serves as the starting point for
> > the discussion.
> >
> > It really comes down to: is this a nice to have or a show stopping
> > requirement? As you mention, it comes down to the use case. I've taught
> at
> > large financial companies where (global) sorting was a real and show
> > stopping use case. Theirs was for a large end of day report that had to
> be
> > globally sorted and consumed by many other groups. Sorry, I can't be more
> > specific.
> >
> > Thanks,
> >
> > Jesse
> >
> > On Thu, May 26, 2016 at 10:19 AM Frances Perry 
> > wrote:
> >
> >> Currently the Beam model doesn't provide the functionality to do
> sorting,
> >> so this is a pretty deep feature request. Let's separate the discussion
> >> into value sorting and global sorting.
> >>
> >> For value sorting, you need to be able to specify some property of the
> >> value (often called a secondary key) and have the GroupByKey/shuffle
> >> implementation sort values for a given key by the secondary key. This
> is a
> >> pretty common use case, and I think exposing this in Beam would make a
> lot
> >> of sense. The Hadoop and the Cloud Dataflow shuffle implementation
> supports
> >> this, for example. So it may just be a matter of figuring out how best
> to
> >> expose it to users. In FlumeJava we had you explicitly ParDo to pair
> values
> >> with a string "sort key" so you'd GroupByKey on a PCollection >> KV> and get back the Values sorted lexicographically by
> >> String. It's a bit gross for users to think about a way to order things
> >> that sorts lexicographically. Looks like Crunch[1] uses a general sort
> key
> >> -- but that likely won't interact cleanly with Beam's use of encoded
> keys
> >> for comparisons. Would be nice to think about if there's a cleaner way.
> >>
> >> For global sorting, you need to be able to be able to generate and
> maintain
> >> orderedness across the elements in a PCollection and have a way to know
> how
> >> to partition the PCollection into balanced, sorted subchunks. This would
> >> have a pretty large impact on the Beam model and potentially on many of
> the
> >> runners. Looking at the Crunch sort [1], it requires users to provide
> the
> >> partitioning function if they want it to scale beyond a single reduce.
> I'd
> >> love to see if there's a way to do better. It also can have a pretty big
> >> impact on the ability to efficiently parallelize execution (things like
> >> dynamic work rebalancing [2] become trickier). Within Google [3], we've
> >> found that this tends to be something that users ask for, but don't
> really
> >> have a strong use case for. It's usually the case that Top suffices or
> that
> >> they would rather redo the algorithm into something that can parallelize
> >> more efficiently without relying on a global sort. Though of course,
> with
> >> out this, we can't actually do the TeraSort benchmark in Beam. ;-)
> >>
> >> And of 

Re: Add Sorting Class?

2016-05-26 Thread Robert Bradshaw
As Frances alluded to, it's also really hard to reconcile the notion
of a globally ordered PCollection in the context of a streaming
pipeline. Sorting also imposes conditions on partitioning, which we
intentionally leave unspecified for maximum flexibility in the
runtime. One also gets into the question of whether particular
operations are order-creating, order-preserving, or order-destroying
and how much extra overhead is required to maintain these properties
for intermediate collections.

Your mention of sorting by time is interesting, as this is the
inherent sort dimension is streaming (and we use features like
windowing and triggering to do correct time-based grouping despite
real-time skew). Other than that, all the uses of sorting I've seen
have been limited to portions of data small enough to be produced by
(and consumed by) a single machine (so tops GBs, not TBs or PBs).

All that aside, I could see more tractable case being made for
ordering (partitioning, etc.) a particular materialization of a
PCollection, i.e. being sorted would not be a property of a
PCollection itself, but could be provided by a sink (e.g. one could
have a sink that promises to write its records in a particular order
within and across shards). It's not inconceivable that this could be
done in a way that is composible with (a large class of) existing
sinks, e.g. given a FileBasedSink and intra/inter-shard-sorting
specifications, one could produce a bounded sink that writes "sorted"
files. Lots of design work TBD...

- Robert




On Thu, May 26, 2016 at 11:32 AM, Jesse Anderson  wrote:
> @frances great analysis. I'm hoping this serves as the starting point for
> the discussion.
>
> It really comes down to: is this a nice to have or a show stopping
> requirement? As you mention, it comes down to the use case. I've taught at
> large financial companies where (global) sorting was a real and show
> stopping use case. Theirs was for a large end of day report that had to be
> globally sorted and consumed by many other groups. Sorry, I can't be more
> specific.
>
> Thanks,
>
> Jesse
>
> On Thu, May 26, 2016 at 10:19 AM Frances Perry 
> wrote:
>
>> Currently the Beam model doesn't provide the functionality to do sorting,
>> so this is a pretty deep feature request. Let's separate the discussion
>> into value sorting and global sorting.
>>
>> For value sorting, you need to be able to specify some property of the
>> value (often called a secondary key) and have the GroupByKey/shuffle
>> implementation sort values for a given key by the secondary key. This is a
>> pretty common use case, and I think exposing this in Beam would make a lot
>> of sense. The Hadoop and the Cloud Dataflow shuffle implementation supports
>> this, for example. So it may just be a matter of figuring out how best to
>> expose it to users. In FlumeJava we had you explicitly ParDo to pair values
>> with a string "sort key" so you'd GroupByKey on a PCollection> KV> and get back the Values sorted lexicographically by
>> String. It's a bit gross for users to think about a way to order things
>> that sorts lexicographically. Looks like Crunch[1] uses a general sort key
>> -- but that likely won't interact cleanly with Beam's use of encoded keys
>> for comparisons. Would be nice to think about if there's a cleaner way.
>>
>> For global sorting, you need to be able to be able to generate and maintain
>> orderedness across the elements in a PCollection and have a way to know how
>> to partition the PCollection into balanced, sorted subchunks. This would
>> have a pretty large impact on the Beam model and potentially on many of the
>> runners. Looking at the Crunch sort [1], it requires users to provide the
>> partitioning function if they want it to scale beyond a single reduce. I'd
>> love to see if there's a way to do better. It also can have a pretty big
>> impact on the ability to efficiently parallelize execution (things like
>> dynamic work rebalancing [2] become trickier). Within Google [3], we've
>> found that this tends to be something that users ask for, but don't really
>> have a strong use case for. It's usually the case that Top suffices or that
>> they would rather redo the algorithm into something that can parallelize
>> more efficiently without relying on a global sort. Though of course, with
>> out this, we can't actually do the TeraSort benchmark in Beam. ;-)
>>
>> And of course there's the impact of the unified model on all this ;-) I
>> think these ideas would translated to windowed PCollections ok, but would
>> want to think carefully about it.
>>
>> [1] https://crunch.apache.org/user-guide.html#sorting
>> [2]
>>
>> https://cloud.google.com/blog/big-data/2016/05/no-shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow
>>
>> [3]
>>
>> https://cloud.google.com/blog/big-data/2016/02/history-of-massive-scale-sorting-experiments-at-google
>>
>>
>> On Thu, May 26, 2016 at 

Re: Add Sorting Class?

2016-05-26 Thread Jesse Anderson
@frances great analysis. I'm hoping this serves as the starting point for
the discussion.

It really comes down to: is this a nice to have or a show stopping
requirement? As you mention, it comes down to the use case. I've taught at
large financial companies where (global) sorting was a real and show
stopping use case. Theirs was for a large end of day report that had to be
globally sorted and consumed by many other groups. Sorry, I can't be more
specific.

Thanks,

Jesse

On Thu, May 26, 2016 at 10:19 AM Frances Perry 
wrote:

> Currently the Beam model doesn't provide the functionality to do sorting,
> so this is a pretty deep feature request. Let's separate the discussion
> into value sorting and global sorting.
>
> For value sorting, you need to be able to specify some property of the
> value (often called a secondary key) and have the GroupByKey/shuffle
> implementation sort values for a given key by the secondary key. This is a
> pretty common use case, and I think exposing this in Beam would make a lot
> of sense. The Hadoop and the Cloud Dataflow shuffle implementation supports
> this, for example. So it may just be a matter of figuring out how best to
> expose it to users. In FlumeJava we had you explicitly ParDo to pair values
> with a string "sort key" so you'd GroupByKey on a PCollection KV> and get back the Values sorted lexicographically by
> String. It's a bit gross for users to think about a way to order things
> that sorts lexicographically. Looks like Crunch[1] uses a general sort key
> -- but that likely won't interact cleanly with Beam's use of encoded keys
> for comparisons. Would be nice to think about if there's a cleaner way.
>
> For global sorting, you need to be able to be able to generate and maintain
> orderedness across the elements in a PCollection and have a way to know how
> to partition the PCollection into balanced, sorted subchunks. This would
> have a pretty large impact on the Beam model and potentially on many of the
> runners. Looking at the Crunch sort [1], it requires users to provide the
> partitioning function if they want it to scale beyond a single reduce. I'd
> love to see if there's a way to do better. It also can have a pretty big
> impact on the ability to efficiently parallelize execution (things like
> dynamic work rebalancing [2] become trickier). Within Google [3], we've
> found that this tends to be something that users ask for, but don't really
> have a strong use case for. It's usually the case that Top suffices or that
> they would rather redo the algorithm into something that can parallelize
> more efficiently without relying on a global sort. Though of course, with
> out this, we can't actually do the TeraSort benchmark in Beam. ;-)
>
> And of course there's the impact of the unified model on all this ;-) I
> think these ideas would translated to windowed PCollections ok, but would
> want to think carefully about it.
>
> [1] https://crunch.apache.org/user-guide.html#sorting
> [2]
>
> https://cloud.google.com/blog/big-data/2016/05/no-shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow
>
> [3]
>
> https://cloud.google.com/blog/big-data/2016/02/history-of-massive-scale-sorting-experiments-at-google
>
>
> On Thu, May 26, 2016 at 8:56 AM, Jesse Anderson 
> wrote:
>
> > This is somewhat the continuation of my thread "Writing Out
> List."
> >
> > Right now, the only way to do sorting is with the Top class. This works
> > well, but has the constraint of fitting in memory.
> >
> > A common batch use case is to take a large file and sort it. For example,
> > this would be sorting a large report (several GB) file by timestamp. As
> of
> > right now, this isn't built into Beam. I think it should be.
> >
> > I'll hold out Crunch's Sort
> > <
> https://crunch.apache.org/apidocs/0.11.0/org/apache/crunch/lib/Sort.html>
> > class as an example of what this class could look like.
> >
> > Thanks,
> >
> > Jesse
> >
>


Add Sorting Class?

2016-05-26 Thread Jesse Anderson
This is somewhat the continuation of my thread "Writing Out List."

Right now, the only way to do sorting is with the Top class. This works
well, but has the constraint of fitting in memory.

A common batch use case is to take a large file and sort it. For example,
this would be sorting a large report (several GB) file by timestamp. As of
right now, this isn't built into Beam. I think it should be.

I'll hold out Crunch's Sort

class as an example of what this class could look like.

Thanks,

Jesse