Nice pointers to the Spark feature. That's interesting. Couple thoughts: - Totally different from per-group ordering (which fits in Beam without deep new model features). - Related to ordered transport, actually, because that is how the ordering produced per partition would actually result in ordered processing. I was wrong about that earlier, if this is how ordering manifests. - Seems likely most runners have needed capabilities to implement. - What about late arrivals in streaming? You can use the watermark to deliver in even time order, but for other orders not sure how to meet the use case. - Does this meet the dataframe need? I'm not really familiar with which pandas soring operations make sense for distributed data. - Does this apply to ORDER BY without LIMIT in SQL, to enable more TPC-DS queries? - Similar to state & timers in that it is a partitioned but non-aggregate processing paradigm. - The particular partitions are not really of interest. That also came up with GroupIntoBatches in order to use state & timers. We want the runner to choose partitioning, preferably supporting dynamic repartitioning with work stealing.
Would it make sense to build into Beam? Maybe. All of the above is about the semantic feature, not a use case for it. Compatibility/parity with other frameworks is not as good a reason. If you already have fixed, bounded, partitions as part of your model, and a shuffle that does a global sort, then the feature is just exposing what is already happening to user computation. If you didn't already have this, maybe there are other ways to achieve the results in an end-to-end use. Kenn On Tue, May 11, 2021 at 2:51 PM Brian Hulette <[email protected]> wrote: > > Global ordering is quite different. I know that SQL and Dataframes have > global sorting operations ... I imagine some other systems have the > features so we can look at how it is used? > > I looked into this for Spark briefly in the past, since I was curious > how/if koalas (a pandas-compatible DataFrame API for Spark) implemented > global sorting/order-sensitive operations. > > I came across OrderedRDDFunctions [1] in the Scala API, and found some > additional background in this SO question [2]. It looks like Spark lets you > `sortByKey` when a key type implements an ordering, which seems to > reshuffle into ordered partitions of the keyspace. Would something like > that be reasonable to build into Beam? Or would it only make sense as part > of higher-level APIs like SQL and DataFrames? > > [1] > https://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/OrderedRDDFunctions.html > [2] > https://stackoverflow.com/questions/29284095/which-operations-preserve-rdd-order > > On Tue, May 11, 2021 at 9:19 AM Jan Lukavský <[email protected]> wrote: > >> I'll just remind that Beam already supports (experimental) >> @RequiresTimeSortedInput (which has several limitations, mostly in that it >> orders only by timestamp and not some - time related - user field; and of >> course - missing retractions). An arbitrary sorting seems to be hard, even >> per-key, it seems it will always have to be somewhat time-bounded, as >> otherwise it might require unbounded states. The batch case on the other >> hand typically has a way to order inputs arbitrarily with virtually zero >> cost, as many implementations use sort-merge-group to perform reduction >> operations. >> >> Jan >> On 5/11/21 5:56 PM, Kenneth Knowles wrote: >> >> Per-key ordered delivery makes a ton of sense. I'd guess CDC has the same >> needs as retractions, so that the changelog can be applied in order as it >> arrives. And since it is per-key you still get parallelism. >> >> Global ordering is quite different. I know that SQL and Dataframes have >> global sorting operations. The question has always been how does >> "embarassingly paralllel" processing interact with sorting/ordering. I >> imagine some other systems have the features so we can look at how it is >> used? >> >> Kenn >> >> Kenn >> >> On Mon, May 10, 2021 at 4:39 PM Sam Rohde <[email protected]> wrote: >> >>> Awesome, thanks Pablo! >>> >>> On Mon, May 10, 2021 at 4:05 PM Pablo Estrada <[email protected]> >>> wrote: >>> >>>> CDC would also benefit. I am working on a proposal for this that is >>>> concerned with streaming pipelines, and per-key ordered delivery. I will >>>> share with you as soon as I have a draft. >>>> Best >>>> -P. >>>> >>>> On Mon, May 10, 2021 at 2:56 PM Reuven Lax <[email protected]> wrote: >>>> >>>>> There has been talk, but nothing concrete. >>>>> >>>>> On Mon, May 10, 2021 at 1:42 PM Sam Rohde <[email protected]> wrote: >>>>> >>>>>> Hi All, >>>>>> >>>>>> I was wondering if there had been any plans for creating ordered >>>>>> PCollections in the Beam model? Or if there might be plans for them in >>>>>> the >>>>>> future? >>>>>> >>>>>> I know that Beam SQL and Beam DataFrames would directly benefit from >>>>>> an ordered PCollection. >>>>>> >>>>>> Regards, >>>>>> Sam >>>>>> >>>>>
