1. This is clearly useful, and extensively used. Agree with all that. I think it can work for batch and streaming equally well if sorting is required only per "pane", though I might be overlooking something.
2. A transform need not be primitive to be well-defined and executed in a special way by most runners. For example, Combine.perKey is not a "primitive", where primitive means "axiomatic, lacking an expansion to other transforms". It has a composite definition in terms of other transforms. However, it certainly is standardized / well-defined and executed in a custom way by all runners, with the possible exception of direct runners (I didn't double check this). To make something a standardized well-defined transform it just needs a URN and an explicitly documented payload that goes along with the URN (which might be empty). Apologies if this is going into details you already know; I just want to emphasize that this is a key aspect of Beam's design, avoiding proliferation of primitives while allowing runners to optimize execution. In order for GroupByKeyAndSortValues* to have a status analogous to Combine.perKey it needs a URN (say, "beam:transforms:gbk-and-sort-values") and a code location where it can have a fallback composite definition. I would suggest piloting the idea of making experimental features opt-in includes with "experimenta" in the artifact id, so something like artifact id "org.apache.beam:beam-sdks-java-experimental-gbk-and-sort-values" (very long, open to improvement). Another idea would be "org.apache.beam.experiments" as a group id. Kenn *Note that BatchViewOverrides.GroupByKeyAndSortValuesOnly is actually an even lower-level primitive, the "Only" part indicates that it is windowing and event time unaware. On Tue, Apr 16, 2019 at 7:42 AM Gleb Kanterov <g...@spotify.com> wrote: > At the moment, portability has GroupByKey transform. In most data > processing frameworks, such as Hadoop MR and Apache Spark there is a > concept of secondary sorting during the shuffle phase. Dataflow worker code > has it under the name BatchViewOverrides.GroupByKeyAndSortValuesOnly [1], > it's PTransform<PCollection<KV<K1, KV<K2, V>>>, PCollection<KV<K1, > Iterable<KV<K2, V>>>>>. It does sharding by K1 and sorting by K2 within > each shard. > > I see a lot of value in adding GroupByKeyAndSort to the list of built-in > transforms so that runners can efficiently override it. It's possible to > define GroupByKeyAndSort as GroupByKey+SortValues [2], however, having it > as primitive will open the possibility for more efficient implementation. > What could be potential drawbacks? I didn't think much how it could work > for non-bach pipelines. > > Gleb > > [1]: > https://github.com/spotify/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java#L1246 > [2]: > https://github.com/apache/beam/blob/master/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java > >