On Wed, Apr 17, 2019 at 7:48 AM Viliam Durina <vil...@hazelcast.com> wrote:
> > Combine.perKey ... certainly is standardized / well-defined > > Is there any document where it's defined? > At the user level, here: https://beam.apache.org/documentation/programming-guide/#combine There are a few places that define it. Thankfully, they are all automated so they have to agree :-) - the protos: https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L481 - Combine expand in Java: https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1585 (and perhaps the javadoc) - Combine expand in Python: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/core.py#L1527 - ValidatesRunner tests (choosing Java as Python does not define it): https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java#L655 (etc) Admittedly, it could use more precise and concise formalization on the one hand, and more conceptual description for users, independent of language. Kenn > Viliam > > On Tue, 16 Apr 2019 at 18:27, Kenneth Knowles <k...@apache.org> wrote: > >> On Tue, Apr 16, 2019 at 9:18 AM Reuven Lax <re...@google.com> wrote: >> >>> A common request (especially in streaming) is to support sorting values >>> by timestamp, not by the full value. >>> >> >> On this point, I think an explicit secondary key probably addresses the >> need. Naively implemented, the "sort by values" use case would have a lot >> of data duplication so we might have some payload on the transform to >> configure that, or a couple of related transforms. >> >> Kenn >> >> >>> >>> Reuven >>> >>> On Tue, Apr 16, 2019 at 9:08 AM Kenneth Knowles <k...@apache.org> wrote: >>> >>>> 1. This is clearly useful, and extensively used. Agree with all that. I >>>> think it can work for batch and streaming equally well if sorting is >>>> required only per "pane", though I might be overlooking something. >>>> >>>> 2. A transform need not be primitive to be well-defined and executed in >>>> a special way by most runners. For example, Combine.perKey is not a >>>> "primitive", where primitive means "axiomatic, lacking an expansion to >>>> other transforms". It has a composite definition in terms of other >>>> transforms. However, it certainly is standardized / well-defined and >>>> executed in a custom way by all runners, with the possible exception of >>>> direct runners (I didn't double check this). To make something a >>>> standardized well-defined transform it just needs a URN and an explicitly >>>> documented payload that goes along with the URN (which might be empty). >>>> Apologies if this is going into details you already know; I just want to >>>> emphasize that this is a key aspect of Beam's design, avoiding >>>> proliferation of primitives while allowing runners to optimize execution. >>>> >>>> In order for GroupByKeyAndSortValues* to have a status analogous to >>>> Combine.perKey it needs a URN (say, "beam:transforms:gbk-and-sort-values") >>>> and a code location where it can have a fallback composite definition. I >>>> would suggest piloting the idea of making experimental features opt-in >>>> includes with "experimenta" in the artifact id, so something like artifact >>>> id "org.apache.beam:beam-sdks-java-experimental-gbk-and-sort-values" (very >>>> long, open to improvement). Another idea would be >>>> "org.apache.beam.experiments" as a group id. >>>> >>>> Kenn >>>> >>>> *Note that BatchViewOverrides.GroupByKeyAndSortValuesOnly is actually >>>> an even lower-level primitive, the "Only" part indicates that it is >>>> windowing and event time unaware. >>>> >>>> On Tue, Apr 16, 2019 at 7:42 AM Gleb Kanterov <g...@spotify.com> wrote: >>>> >>>>> At the moment, portability has GroupByKey transform. In most data >>>>> processing frameworks, such as Hadoop MR and Apache Spark there is a >>>>> concept of secondary sorting during the shuffle phase. Dataflow worker >>>>> code >>>>> has it under the name BatchViewOverrides.GroupByKeyAndSortValuesOnly [1], >>>>> it's PTransform<PCollection<KV<K1, KV<K2, V>>>, PCollection<KV<K1, >>>>> Iterable<KV<K2, V>>>>>. It does sharding by K1 and sorting by K2 within >>>>> each shard. >>>>> >>>>> I see a lot of value in adding GroupByKeyAndSort to the list of >>>>> built-in transforms so that runners can efficiently override it. It's >>>>> possible to define GroupByKeyAndSort as GroupByKey+SortValues [2], >>>>> however, >>>>> having it as primitive will open the possibility for more efficient >>>>> implementation. What could be potential drawbacks? I didn't think much how >>>>> it could work for non-bach pipelines. >>>>> >>>>> Gleb >>>>> >>>>> [1]: >>>>> https://github.com/spotify/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java#L1246 >>>>> [2]: >>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java >>>>> >>>>>