Totally agree that orderings of values within a
key[-window[-pane]]-grouping being quite useful, and they make total
sense in the model (primarily because elements themselves are never
partitioned).

On Fri, May 27, 2016 at 11:31 AM, Bobby Evans
<ev...@yahoo-inc.com.invalid> wrote:
> If you have local ordering within a group-by like what Francis said, you can 
> build global total ordering off of it, by partitioning the data in ranges.  
> Most terrasort implementations do this.  They subsample the data to get a 
> fairly evenly distributed range of keys, and then do individual smaller sorts 
> that are written out to ordered files.
>
> Having ordering within a window/key seems to make since to me from a 
> streaming perspective too.  So for batch where the default window is 
> everything, you would get ordering for all of the data with the same 
> key/partition.  For streaming you could get ordering within a given window 
> for a key/partition.  I can see a number of uses for this, and combining 
> different panes/windows together and still preserving ordering is not that 
> hard using a streaming merge sort.
>  Yes there is a metadata problem about which transforms preserve ordering of 
> a PCollection and which ones do not, but for most things except sinks that 
> can come as an optimization that transforms can adapt to over time.  If we 
> assume the default is that they do not preserve order (which is what pig 
> does) then you would write your DAG with an order by transform in front of 
> the sink.  The only issue here would be for sinks that might not preserve 
> ordering (like with bigquery that does a dedupe prior to writing out data).
>
> - Bobby
>
>     On Thursday, May 26, 2016 3:40 PM, Jesse Anderson <je...@smokinghand.com> 
> wrote:
>
>
>  Another perspective is to look at other projects in the Hadoop ecosystem.
>
> Impala had to have a LIMIT any time you did an ORDER BY. They're since
> removed this limitation.
>
> Hive has two sorting options. ORDER BY does a global order. SORT BY orders
> everything in that partition.
>
> On Thu, May 26, 2016 at 12:35 PM Jesse Anderson <je...@smokinghand.com>
> wrote:
>
>> I had a similar thought, but wasn't sure if that violated a tenet of Beam.
>>
>> I'm thinking an ordered sink could wrap around another sink. I could see
>> something like:
>> collection.apply(OrderedSink.Timestamp.write(TextIO.Write.To(...)));
>>
>> On Thu, May 26, 2016 at 12:26 PM Robert Bradshaw
>> <rober...@google.com.invalid> wrote:
>>
>>> As Frances alluded to, it's also really hard to reconcile the notion
>>> of a globally ordered PCollection in the context of a streaming
>>> pipeline. Sorting also imposes conditions on partitioning, which we
>>> intentionally leave unspecified for maximum flexibility in the
>>> runtime. One also gets into the question of whether particular
>>> operations are order-creating, order-preserving, or order-destroying
>>> and how much extra overhead is required to maintain these properties
>>> for intermediate collections.
>>>
>>> Your mention of sorting by time is interesting, as this is the
>>> inherent sort dimension is streaming (and we use features like
>>> windowing and triggering to do correct time-based grouping despite
>>> real-time skew). Other than that, all the uses of sorting I've seen
>>> have been limited to portions of data small enough to be produced by
>>> (and consumed by) a single machine (so tops GBs, not TBs or PBs).
>>>
>>> All that aside, I could see more tractable case being made for
>>> ordering (partitioning, etc.) a particular materialization of a
>>> PCollection, i.e. being sorted would not be a property of a
>>> PCollection itself, but could be provided by a sink (e.g. one could
>>> have a sink that promises to write its records in a particular order
>>> within and across shards). It's not inconceivable that this could be
>>> done in a way that is composible with (a large class of) existing
>>> sinks, e.g. given a FileBasedSink and intra/inter-shard-sorting
>>> specifications, one could produce a bounded sink that writes "sorted"
>>> files. Lots of design work TBD...
>>>
>>> - Robert
>>>
>>>
>>>
>>>
>>> On Thu, May 26, 2016 at 11:32 AM, Jesse Anderson <je...@smokinghand.com>
>>> wrote:
>>> > @frances great analysis. I'm hoping this serves as the starting point
>>> for
>>> > the discussion.
>>> >
>>> > It really comes down to: is this a nice to have or a show stopping
>>> > requirement? As you mention, it comes down to the use case. I've taught
>>> at
>>> > large financial companies where (global) sorting was a real and show
>>> > stopping use case. Theirs was for a large end of day report that had to
>>> be
>>> > globally sorted and consumed by many other groups. Sorry, I can't be
>>> more
>>> > specific.
>>> >
>>> > Thanks,
>>> >
>>> > Jesse
>>> >
>>> > On Thu, May 26, 2016 at 10:19 AM Frances Perry <f...@google.com.invalid>
>>> > wrote:
>>> >
>>> >> Currently the Beam model doesn't provide the functionality to do
>>> sorting,
>>> >> so this is a pretty deep feature request. Let's separate the discussion
>>> >> into value sorting and global sorting.
>>> >>
>>> >> For value sorting, you need to be able to specify some property of the
>>> >> value (often called a secondary key) and have the GroupByKey/shuffle
>>> >> implementation sort values for a given key by the secondary key. This
>>> is a
>>> >> pretty common use case, and I think exposing this in Beam would make a
>>> lot
>>> >> of sense. The Hadoop and the Cloud Dataflow shuffle implementation
>>> supports
>>> >> this, for example. So it may just be a matter of figuring out how best
>>> to
>>> >> expose it to users. In FlumeJava we had you explicitly ParDo to pair
>>> values
>>> >> with a string "sort key" so you'd GroupByKey on a PCollection<KV<Key,
>>> >> KV<String, Value>> and get back the Values sorted lexicographically by
>>> >> String. It's a bit gross for users to think about a way to order things
>>> >> that sorts lexicographically. Looks like Crunch[1] uses a general sort
>>> key
>>> >> -- but that likely won't interact cleanly with Beam's use of encoded
>>> keys
>>> >> for comparisons. Would be nice to think about if there's a cleaner way.
>>> >>
>>> >> For global sorting, you need to be able to be able to generate and
>>> maintain
>>> >> orderedness across the elements in a PCollection and have a way to
>>> know how
>>> >> to partition the PCollection into balanced, sorted subchunks. This
>>> would
>>> >> have a pretty large impact on the Beam model and potentially on many
>>> of the
>>> >> runners. Looking at the Crunch sort [1], it requires users to provide
>>> the
>>> >> partitioning function if they want it to scale beyond a single reduce.
>>> I'd
>>> >> love to see if there's a way to do better. It also can have a pretty
>>> big
>>> >> impact on the ability to efficiently parallelize execution (things like
>>> >> dynamic work rebalancing [2] become trickier). Within Google [3], we've
>>> >> found that this tends to be something that users ask for, but don't
>>> really
>>> >> have a strong use case for. It's usually the case that Top suffices or
>>> that
>>> >> they would rather redo the algorithm into something that can
>>> parallelize
>>> >> more efficiently without relying on a global sort. Though of course,
>>> with
>>> >> out this, we can't actually do the TeraSort benchmark in Beam. ;-)
>>> >>
>>> >> And of course there's the impact of the unified model on all this ;-) I
>>> >> think these ideas would translated to windowed PCollections ok, but
>>> would
>>> >> want to think carefully about it.
>>> >>
>>> >> [1] https://crunch.apache.org/user-guide.html#sorting
>>> >> [2]
>>> >>
>>> >>
>>> https://cloud.google.com/blog/big-data/2016/05/no-shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow
>>> >>
>>> >> [3]
>>> >>
>>> >>
>>> https://cloud.google.com/blog/big-data/2016/02/history-of-massive-scale-sorting-experiments-at-google
>>> >>
>>> >>
>>> >> On Thu, May 26, 2016 at 8:56 AM, Jesse Anderson <je...@smokinghand.com
>>> >
>>> >> wrote:
>>> >>
>>> >> > This is somewhat the continuation of my thread "Writing Out
>>> >> List<String>."
>>> >> >
>>> >> > Right now, the only way to do sorting is with the Top class. This
>>> works
>>> >> > well, but has the constraint of fitting in memory.
>>> >> >
>>> >> > A common batch use case is to take a large file and sort it. For
>>> example,
>>> >> > this would be sorting a large report (several GB) file by timestamp.
>>> As
>>> >> of
>>> >> > right now, this isn't built into Beam. I think it should be.
>>> >> >
>>> >> > I'll hold out Crunch's Sort
>>> >> > <
>>> >>
>>> https://crunch.apache.org/apidocs/0.11.0/org/apache/crunch/lib/Sort.html>
>>> >> > class as an example of what this class could look like.
>>> >> >
>>> >> > Thanks,
>>> >> >
>>> >> > Jesse
>>> >> >
>>> >>
>>>
>>
>
>
>

Reply via email to