Totally agree that orderings of values within a key[-window[-pane]]-grouping being quite useful, and they make total sense in the model (primarily because elements themselves are never partitioned).
On Fri, May 27, 2016 at 11:31 AM, Bobby Evans <ev...@yahoo-inc.com.invalid> wrote: > If you have local ordering within a group-by like what Francis said, you can > build global total ordering off of it, by partitioning the data in ranges. > Most terrasort implementations do this. They subsample the data to get a > fairly evenly distributed range of keys, and then do individual smaller sorts > that are written out to ordered files. > > Having ordering within a window/key seems to make since to me from a > streaming perspective too. So for batch where the default window is > everything, you would get ordering for all of the data with the same > key/partition. For streaming you could get ordering within a given window > for a key/partition. I can see a number of uses for this, and combining > different panes/windows together and still preserving ordering is not that > hard using a streaming merge sort. > Yes there is a metadata problem about which transforms preserve ordering of > a PCollection and which ones do not, but for most things except sinks that > can come as an optimization that transforms can adapt to over time. If we > assume the default is that they do not preserve order (which is what pig > does) then you would write your DAG with an order by transform in front of > the sink. The only issue here would be for sinks that might not preserve > ordering (like with bigquery that does a dedupe prior to writing out data). > > - Bobby > > On Thursday, May 26, 2016 3:40 PM, Jesse Anderson <je...@smokinghand.com> > wrote: > > > Another perspective is to look at other projects in the Hadoop ecosystem. > > Impala had to have a LIMIT any time you did an ORDER BY. They're since > removed this limitation. > > Hive has two sorting options. ORDER BY does a global order. SORT BY orders > everything in that partition. > > On Thu, May 26, 2016 at 12:35 PM Jesse Anderson <je...@smokinghand.com> > wrote: > >> I had a similar thought, but wasn't sure if that violated a tenet of Beam. >> >> I'm thinking an ordered sink could wrap around another sink. I could see >> something like: >> collection.apply(OrderedSink.Timestamp.write(TextIO.Write.To(...))); >> >> On Thu, May 26, 2016 at 12:26 PM Robert Bradshaw >> <rober...@google.com.invalid> wrote: >> >>> As Frances alluded to, it's also really hard to reconcile the notion >>> of a globally ordered PCollection in the context of a streaming >>> pipeline. Sorting also imposes conditions on partitioning, which we >>> intentionally leave unspecified for maximum flexibility in the >>> runtime. One also gets into the question of whether particular >>> operations are order-creating, order-preserving, or order-destroying >>> and how much extra overhead is required to maintain these properties >>> for intermediate collections. >>> >>> Your mention of sorting by time is interesting, as this is the >>> inherent sort dimension is streaming (and we use features like >>> windowing and triggering to do correct time-based grouping despite >>> real-time skew). Other than that, all the uses of sorting I've seen >>> have been limited to portions of data small enough to be produced by >>> (and consumed by) a single machine (so tops GBs, not TBs or PBs). >>> >>> All that aside, I could see more tractable case being made for >>> ordering (partitioning, etc.) a particular materialization of a >>> PCollection, i.e. being sorted would not be a property of a >>> PCollection itself, but could be provided by a sink (e.g. one could >>> have a sink that promises to write its records in a particular order >>> within and across shards). It's not inconceivable that this could be >>> done in a way that is composible with (a large class of) existing >>> sinks, e.g. given a FileBasedSink and intra/inter-shard-sorting >>> specifications, one could produce a bounded sink that writes "sorted" >>> files. Lots of design work TBD... >>> >>> - Robert >>> >>> >>> >>> >>> On Thu, May 26, 2016 at 11:32 AM, Jesse Anderson <je...@smokinghand.com> >>> wrote: >>> > @frances great analysis. I'm hoping this serves as the starting point >>> for >>> > the discussion. >>> > >>> > It really comes down to: is this a nice to have or a show stopping >>> > requirement? As you mention, it comes down to the use case. I've taught >>> at >>> > large financial companies where (global) sorting was a real and show >>> > stopping use case. Theirs was for a large end of day report that had to >>> be >>> > globally sorted and consumed by many other groups. Sorry, I can't be >>> more >>> > specific. >>> > >>> > Thanks, >>> > >>> > Jesse >>> > >>> > On Thu, May 26, 2016 at 10:19 AM Frances Perry <f...@google.com.invalid> >>> > wrote: >>> > >>> >> Currently the Beam model doesn't provide the functionality to do >>> sorting, >>> >> so this is a pretty deep feature request. Let's separate the discussion >>> >> into value sorting and global sorting. >>> >> >>> >> For value sorting, you need to be able to specify some property of the >>> >> value (often called a secondary key) and have the GroupByKey/shuffle >>> >> implementation sort values for a given key by the secondary key. This >>> is a >>> >> pretty common use case, and I think exposing this in Beam would make a >>> lot >>> >> of sense. The Hadoop and the Cloud Dataflow shuffle implementation >>> supports >>> >> this, for example. So it may just be a matter of figuring out how best >>> to >>> >> expose it to users. In FlumeJava we had you explicitly ParDo to pair >>> values >>> >> with a string "sort key" so you'd GroupByKey on a PCollection<KV<Key, >>> >> KV<String, Value>> and get back the Values sorted lexicographically by >>> >> String. It's a bit gross for users to think about a way to order things >>> >> that sorts lexicographically. Looks like Crunch[1] uses a general sort >>> key >>> >> -- but that likely won't interact cleanly with Beam's use of encoded >>> keys >>> >> for comparisons. Would be nice to think about if there's a cleaner way. >>> >> >>> >> For global sorting, you need to be able to be able to generate and >>> maintain >>> >> orderedness across the elements in a PCollection and have a way to >>> know how >>> >> to partition the PCollection into balanced, sorted subchunks. This >>> would >>> >> have a pretty large impact on the Beam model and potentially on many >>> of the >>> >> runners. Looking at the Crunch sort [1], it requires users to provide >>> the >>> >> partitioning function if they want it to scale beyond a single reduce. >>> I'd >>> >> love to see if there's a way to do better. It also can have a pretty >>> big >>> >> impact on the ability to efficiently parallelize execution (things like >>> >> dynamic work rebalancing [2] become trickier). Within Google [3], we've >>> >> found that this tends to be something that users ask for, but don't >>> really >>> >> have a strong use case for. It's usually the case that Top suffices or >>> that >>> >> they would rather redo the algorithm into something that can >>> parallelize >>> >> more efficiently without relying on a global sort. Though of course, >>> with >>> >> out this, we can't actually do the TeraSort benchmark in Beam. ;-) >>> >> >>> >> And of course there's the impact of the unified model on all this ;-) I >>> >> think these ideas would translated to windowed PCollections ok, but >>> would >>> >> want to think carefully about it. >>> >> >>> >> [1] https://crunch.apache.org/user-guide.html#sorting >>> >> [2] >>> >> >>> >> >>> https://cloud.google.com/blog/big-data/2016/05/no-shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow >>> >> >>> >> [3] >>> >> >>> >> >>> https://cloud.google.com/blog/big-data/2016/02/history-of-massive-scale-sorting-experiments-at-google >>> >> >>> >> >>> >> On Thu, May 26, 2016 at 8:56 AM, Jesse Anderson <je...@smokinghand.com >>> > >>> >> wrote: >>> >> >>> >> > This is somewhat the continuation of my thread "Writing Out >>> >> List<String>." >>> >> > >>> >> > Right now, the only way to do sorting is with the Top class. This >>> works >>> >> > well, but has the constraint of fitting in memory. >>> >> > >>> >> > A common batch use case is to take a large file and sort it. For >>> example, >>> >> > this would be sorting a large report (several GB) file by timestamp. >>> As >>> >> of >>> >> > right now, this isn't built into Beam. I think it should be. >>> >> > >>> >> > I'll hold out Crunch's Sort >>> >> > < >>> >> >>> https://crunch.apache.org/apidocs/0.11.0/org/apache/crunch/lib/Sort.html> >>> >> > class as an example of what this class could look like. >>> >> > >>> >> > Thanks, >>> >> > >>> >> > Jesse >>> >> > >>> >> >>> >> > > >