On Wed, Apr 17, 2019 at 7:48 AM Viliam Durina <vil...@hazelcast.com> wrote:

> > Combine.perKey ... certainly is standardized / well-defined
>
> Is there any document where it's defined?
>

At the user level, here:
https://beam.apache.org/documentation/programming-guide/#combine

There are a few places that define it. Thankfully, they are all automated
so they have to agree :-)

 - the protos:
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L481
 - Combine expand in Java:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java#L1585
(and
perhaps the javadoc)
 - Combine expand in Python:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/core.py#L1527
 - ValidatesRunner tests (choosing Java as Python does not define it):
https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java#L655
 (etc)

Admittedly, it could use more precise and concise formalization on the one
hand, and more conceptual description for users, independent of language.

Kenn



> Viliam
>
> On Tue, 16 Apr 2019 at 18:27, Kenneth Knowles <k...@apache.org> wrote:
>
>> On Tue, Apr 16, 2019 at 9:18 AM Reuven Lax <re...@google.com> wrote:
>>
>>> A common request (especially in streaming) is to support sorting values
>>> by timestamp, not by the full value.
>>>
>>
>> On this point, I think an explicit secondary key probably addresses the
>> need. Naively implemented, the "sort by values" use case would have a lot
>> of data duplication so we might have some payload on the transform to
>> configure that, or a couple of related transforms.
>>
>> Kenn
>>
>>
>>>
>>> Reuven
>>>
>>> On Tue, Apr 16, 2019 at 9:08 AM Kenneth Knowles <k...@apache.org> wrote:
>>>
>>>> 1. This is clearly useful, and extensively used. Agree with all that. I
>>>> think it can work for batch and streaming equally well if sorting is
>>>> required only per "pane", though I might be overlooking something.
>>>>
>>>> 2. A transform need not be primitive to be well-defined and executed in
>>>> a special way by most runners. For example, Combine.perKey is not a
>>>> "primitive", where primitive means "axiomatic, lacking an expansion to
>>>> other transforms". It has a composite definition in terms of other
>>>> transforms. However, it certainly is standardized / well-defined and
>>>> executed in a custom way by all runners, with the possible exception of
>>>> direct runners (I didn't double check this). To make something a
>>>> standardized well-defined transform it just needs a URN and an explicitly
>>>> documented payload that goes along with the URN (which might be empty).
>>>> Apologies if this is going into details you already know; I just want to
>>>> emphasize that this is a key aspect of Beam's design, avoiding
>>>> proliferation of primitives while allowing runners to optimize execution.
>>>>
>>>> In order for GroupByKeyAndSortValues* to have a status analogous to
>>>> Combine.perKey it needs a URN (say, "beam:transforms:gbk-and-sort-values")
>>>> and a code location where it can have a fallback composite definition. I
>>>> would suggest piloting the idea of making experimental features opt-in
>>>> includes with "experimenta" in the artifact id, so something like artifact
>>>> id "org.apache.beam:beam-sdks-java-experimental-gbk-and-sort-values" (very
>>>> long, open to improvement). Another idea would be
>>>> "org.apache.beam.experiments" as a group id.
>>>>
>>>> Kenn
>>>>
>>>> *Note that BatchViewOverrides.GroupByKeyAndSortValuesOnly is actually
>>>> an even lower-level primitive, the "Only" part indicates that it is
>>>> windowing and event time unaware.
>>>>
>>>> On Tue, Apr 16, 2019 at 7:42 AM Gleb Kanterov <g...@spotify.com> wrote:
>>>>
>>>>> At the moment, portability has GroupByKey transform. In most data
>>>>> processing frameworks, such as Hadoop MR and Apache Spark there is a
>>>>> concept of secondary sorting during the shuffle phase. Dataflow worker 
>>>>> code
>>>>> has it under the name BatchViewOverrides.GroupByKeyAndSortValuesOnly [1],
>>>>> it's PTransform<PCollection<KV<K1, KV<K2, V>>>, PCollection<KV<K1,
>>>>> Iterable<KV<K2, V>>>>>. It does sharding by K1 and sorting by K2 within
>>>>> each shard.
>>>>>
>>>>> I see a lot of value in adding GroupByKeyAndSort to the list of
>>>>> built-in transforms so that runners can efficiently override it. It's
>>>>> possible to define GroupByKeyAndSort as GroupByKey+SortValues [2], 
>>>>> however,
>>>>> having it as primitive will open the possibility for more efficient
>>>>> implementation. What could be potential drawbacks? I didn't think much how
>>>>> it could work for non-bach pipelines.
>>>>>
>>>>> Gleb
>>>>>
>>>>> [1]:
>>>>> https://github.com/spotify/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java#L1246
>>>>> [2]:
>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
>>>>>
>>>>>

Reply via email to