This is a good conversation. Some things to consider:

Since Beam is cross language, the "shufflers" can usually only sort by
binary value. This is different than other systems where custom comparators
can be used for sorting. We might need to introduce OrderPreservingCoder,
and mark the coders that we know preserve natural order. Some types of
values can not be easily supported here (e.g. anything without a
deterministic Coder). Of course we could always fall back to in-memory
sorting in this case, however that's somewhat problematic if the Value list
does not fit in memory.

Presumably sorting is only ever per window. For merging windows such as
sessions, simply sorting in the shufflers is not enough. Beam will need to
merge these sorted values whenever windows are merged.

The streaming semantics need to be defined. Presumably it means that for
ever trigger firing, the list of values in the current pane is sorted.

A common request (especially in streaming) is to support sorting values by
timestamp, not by the full value.

Reuven

On Tue, Apr 16, 2019 at 9:08 AM Kenneth Knowles <k...@apache.org> wrote:

> 1. This is clearly useful, and extensively used. Agree with all that. I
> think it can work for batch and streaming equally well if sorting is
> required only per "pane", though I might be overlooking something.
>
> 2. A transform need not be primitive to be well-defined and executed in a
> special way by most runners. For example, Combine.perKey is not a
> "primitive", where primitive means "axiomatic, lacking an expansion to
> other transforms". It has a composite definition in terms of other
> transforms. However, it certainly is standardized / well-defined and
> executed in a custom way by all runners, with the possible exception of
> direct runners (I didn't double check this). To make something a
> standardized well-defined transform it just needs a URN and an explicitly
> documented payload that goes along with the URN (which might be empty).
> Apologies if this is going into details you already know; I just want to
> emphasize that this is a key aspect of Beam's design, avoiding
> proliferation of primitives while allowing runners to optimize execution.
>
> In order for GroupByKeyAndSortValues* to have a status analogous to
> Combine.perKey it needs a URN (say, "beam:transforms:gbk-and-sort-values")
> and a code location where it can have a fallback composite definition. I
> would suggest piloting the idea of making experimental features opt-in
> includes with "experimenta" in the artifact id, so something like artifact
> id "org.apache.beam:beam-sdks-java-experimental-gbk-and-sort-values" (very
> long, open to improvement). Another idea would be
> "org.apache.beam.experiments" as a group id.
>
> Kenn
>
> *Note that BatchViewOverrides.GroupByKeyAndSortValuesOnly is actually an
> even lower-level primitive, the "Only" part indicates that it is windowing
> and event time unaware.
>
> On Tue, Apr 16, 2019 at 7:42 AM Gleb Kanterov <g...@spotify.com> wrote:
>
>> At the moment, portability has GroupByKey transform. In most data
>> processing frameworks, such as Hadoop MR and Apache Spark there is a
>> concept of secondary sorting during the shuffle phase. Dataflow worker code
>> has it under the name BatchViewOverrides.GroupByKeyAndSortValuesOnly [1],
>> it's PTransform<PCollection<KV<K1, KV<K2, V>>>, PCollection<KV<K1,
>> Iterable<KV<K2, V>>>>>. It does sharding by K1 and sorting by K2 within
>> each shard.
>>
>> I see a lot of value in adding GroupByKeyAndSort to the list of built-in
>> transforms so that runners can efficiently override it. It's possible to
>> define GroupByKeyAndSort as GroupByKey+SortValues [2], however, having it
>> as primitive will open the possibility for more efficient implementation.
>> What could be potential drawbacks? I didn't think much how it could work
>> for non-bach pipelines.
>>
>> Gleb
>>
>> [1]:
>> https://github.com/spotify/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java#L1246
>> [2]:
>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
>>
>>

Reply via email to