Another perspective is to look at other projects in the Hadoop ecosystem.

Impala had to have a LIMIT any time you did an ORDER BY. They're since
removed this limitation.

Hive has two sorting options. ORDER BY does a global order. SORT BY orders
everything in that partition.

On Thu, May 26, 2016 at 12:35 PM Jesse Anderson <je...@smokinghand.com>
wrote:

> I had a similar thought, but wasn't sure if that violated a tenet of Beam.
>
> I'm thinking an ordered sink could wrap around another sink. I could see
> something like:
> collection.apply(OrderedSink.Timestamp.write(TextIO.Write.To(...)));
>
> On Thu, May 26, 2016 at 12:26 PM Robert Bradshaw
> <rober...@google.com.invalid> wrote:
>
>> As Frances alluded to, it's also really hard to reconcile the notion
>> of a globally ordered PCollection in the context of a streaming
>> pipeline. Sorting also imposes conditions on partitioning, which we
>> intentionally leave unspecified for maximum flexibility in the
>> runtime. One also gets into the question of whether particular
>> operations are order-creating, order-preserving, or order-destroying
>> and how much extra overhead is required to maintain these properties
>> for intermediate collections.
>>
>> Your mention of sorting by time is interesting, as this is the
>> inherent sort dimension is streaming (and we use features like
>> windowing and triggering to do correct time-based grouping despite
>> real-time skew). Other than that, all the uses of sorting I've seen
>> have been limited to portions of data small enough to be produced by
>> (and consumed by) a single machine (so tops GBs, not TBs or PBs).
>>
>> All that aside, I could see more tractable case being made for
>> ordering (partitioning, etc.) a particular materialization of a
>> PCollection, i.e. being sorted would not be a property of a
>> PCollection itself, but could be provided by a sink (e.g. one could
>> have a sink that promises to write its records in a particular order
>> within and across shards). It's not inconceivable that this could be
>> done in a way that is composible with (a large class of) existing
>> sinks, e.g. given a FileBasedSink and intra/inter-shard-sorting
>> specifications, one could produce a bounded sink that writes "sorted"
>> files. Lots of design work TBD...
>>
>> - Robert
>>
>>
>>
>>
>> On Thu, May 26, 2016 at 11:32 AM, Jesse Anderson <je...@smokinghand.com>
>> wrote:
>> > @frances great analysis. I'm hoping this serves as the starting point
>> for
>> > the discussion.
>> >
>> > It really comes down to: is this a nice to have or a show stopping
>> > requirement? As you mention, it comes down to the use case. I've taught
>> at
>> > large financial companies where (global) sorting was a real and show
>> > stopping use case. Theirs was for a large end of day report that had to
>> be
>> > globally sorted and consumed by many other groups. Sorry, I can't be
>> more
>> > specific.
>> >
>> > Thanks,
>> >
>> > Jesse
>> >
>> > On Thu, May 26, 2016 at 10:19 AM Frances Perry <f...@google.com.invalid>
>> > wrote:
>> >
>> >> Currently the Beam model doesn't provide the functionality to do
>> sorting,
>> >> so this is a pretty deep feature request. Let's separate the discussion
>> >> into value sorting and global sorting.
>> >>
>> >> For value sorting, you need to be able to specify some property of the
>> >> value (often called a secondary key) and have the GroupByKey/shuffle
>> >> implementation sort values for a given key by the secondary key. This
>> is a
>> >> pretty common use case, and I think exposing this in Beam would make a
>> lot
>> >> of sense. The Hadoop and the Cloud Dataflow shuffle implementation
>> supports
>> >> this, for example. So it may just be a matter of figuring out how best
>> to
>> >> expose it to users. In FlumeJava we had you explicitly ParDo to pair
>> values
>> >> with a string "sort key" so you'd GroupByKey on a PCollection<KV<Key,
>> >> KV<String, Value>> and get back the Values sorted lexicographically by
>> >> String. It's a bit gross for users to think about a way to order things
>> >> that sorts lexicographically. Looks like Crunch[1] uses a general sort
>> key
>> >> -- but that likely won't interact cleanly with Beam's use of encoded
>> keys
>> >> for comparisons. Would be nice to think about if there's a cleaner way.
>> >>
>> >> For global sorting, you need to be able to be able to generate and
>> maintain
>> >> orderedness across the elements in a PCollection and have a way to
>> know how
>> >> to partition the PCollection into balanced, sorted subchunks. This
>> would
>> >> have a pretty large impact on the Beam model and potentially on many
>> of the
>> >> runners. Looking at the Crunch sort [1], it requires users to provide
>> the
>> >> partitioning function if they want it to scale beyond a single reduce.
>> I'd
>> >> love to see if there's a way to do better. It also can have a pretty
>> big
>> >> impact on the ability to efficiently parallelize execution (things like
>> >> dynamic work rebalancing [2] become trickier). Within Google [3], we've
>> >> found that this tends to be something that users ask for, but don't
>> really
>> >> have a strong use case for. It's usually the case that Top suffices or
>> that
>> >> they would rather redo the algorithm into something that can
>> parallelize
>> >> more efficiently without relying on a global sort. Though of course,
>> with
>> >> out this, we can't actually do the TeraSort benchmark in Beam. ;-)
>> >>
>> >> And of course there's the impact of the unified model on all this ;-) I
>> >> think these ideas would translated to windowed PCollections ok, but
>> would
>> >> want to think carefully about it.
>> >>
>> >> [1] https://crunch.apache.org/user-guide.html#sorting
>> >> [2]
>> >>
>> >>
>> https://cloud.google.com/blog/big-data/2016/05/no-shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow
>> >>
>> >> [3]
>> >>
>> >>
>> https://cloud.google.com/blog/big-data/2016/02/history-of-massive-scale-sorting-experiments-at-google
>> >>
>> >>
>> >> On Thu, May 26, 2016 at 8:56 AM, Jesse Anderson <je...@smokinghand.com
>> >
>> >> wrote:
>> >>
>> >> > This is somewhat the continuation of my thread "Writing Out
>> >> List<String>."
>> >> >
>> >> > Right now, the only way to do sorting is with the Top class. This
>> works
>> >> > well, but has the constraint of fitting in memory.
>> >> >
>> >> > A common batch use case is to take a large file and sort it. For
>> example,
>> >> > this would be sorting a large report (several GB) file by timestamp.
>> As
>> >> of
>> >> > right now, this isn't built into Beam. I think it should be.
>> >> >
>> >> > I'll hold out Crunch's Sort
>> >> > <
>> >>
>> https://crunch.apache.org/apidocs/0.11.0/org/apache/crunch/lib/Sort.html>
>> >> > class as an example of what this class could look like.
>> >> >
>> >> > Thanks,
>> >> >
>> >> > Jesse
>> >> >
>> >>
>>
>

Reply via email to