@Robert @Luke @Jan, could you help take an early look at 
https://github.com/apache/beam/pull/15665 
<https://github.com/apache/beam/pull/15665> there are some test failures that 
are to be resolved, but most likely not related to the PR because I observes 
the same failures across PRs.



> On Oct 6, 2021, at 11:03 AM, Robert Burke <rob...@frantil.com> wrote:
> 
> The GoSDK handles the urn as unkeyed. 
> 
> That is, reshuffling a PCollection<KV> will ignore the keys, and produce a 
> PCollection<KV<int,KV>> with the random keys. This would split user keys up 
> to multiple partitions. This is the same as though it were unkeyed.
> 
> Doing anything with the user key specifically would seem to me to defeat the 
> point of a reshuffle, vs just using a GBK which would align keys to bundles 
> in it's output.
> 
> 
> On Wed, Oct 6, 2021, 10:54 AM Ke Wu <ke.wu...@gmail.com 
> <mailto:ke.wu...@gmail.com>> wrote:
>> The only usage of of the keyed Reshuffle in the Java SDK is for write files 
>> with a single key and the use case there would benefit from being replaced 
>> with GroupIntoBatches instead.
> 
> I think there are more use cases for keyed reshuffle , e.g. in Samza runner, 
> it is also used when we rekeyed elements, in addition, since states are 
> partitioned by key, so it is important to reshuffle after a PCollection is 
> assigned with a different key so elements with the same new key could end up 
> in the same partition.
> 
>> I believe the intent was for the existing reshuffle URN to represent the 
>> keyed variant. This was since the unkeyed reshuffle was a composite built on 
>> top of the keyed reshuffle in the Java SDK. The existing overrides in 
>> Flink/Spark/Samza confirm this.
> 
> I believe so, because all Samza/Flink/Spark ’s Reshuffle translator are 
> authored in Java, which is expecting keyed Reshuffle<K, V>.
> 
>> I believe the intent was for the existing reshuffle URN to represent the 
>> keyed variant.
>> And from the Python side I thought the intent was for the reshuffle
>> URN to represent the unkeyed variant, as the keyed one isn't anything
>> novel
> 
> 
> This is exactly what is confusing, the same urn is currently representing 
> keyed reshuffle in Java SDK but unkeyed reshuffle in Python SDK. 
> @Luke do you think it makes since to have two separately Urns representing 
> two different reshuffles? Unkeyed reshuffle is still expected to be a 
> composite transform of keyed transform and runners can decided which 
> (keyed/unkeyd) reshuffle they want to translate.
> 
> Best,
> Ke
> 
>> On Oct 6, 2021, at 10:38 AM, Reuven Lax <re...@google.com 
>> <mailto:re...@google.com>> wrote:
>> 
>> I think it's used with the destination as a key, no? In various places 
>> Reshuffle is used as a standin for RequiresStableInput
>> 
>> On Wed, Oct 6, 2021 at 10:07 AM Luke Cwik <lc...@google.com 
>> <mailto:lc...@google.com>> wrote:
>> I believe the intent was for the existing reshuffle URN to represent the 
>> keyed variant. This was since the unkeyed reshuffle was a composite built on 
>> top of the keyed reshuffle in the Java SDK. The existing overrides in 
>> Flink/Spark/Samza confirm this.
>> 
>> Thinking about this more I wish we had went only with the unkeyed variant as 
>> I don't know how much benefit users get from having their records grouped by 
>> the key they choose and it also limits the optimization capabilities of the 
>> runner a lot as to how to materialize the data.
>> 
>> The only usage of of the keyed Reshuffle in the Java SDK is for write files 
>> with a single key and the use case there would benefit from being replaced 
>> with GroupIntoBatches instead.
>> 
>> 
>> On Mon, Oct 4, 2021 at 6:31 PM Robert Burke <rob...@frantil.com 
>> <mailto:rob...@frantil.com>> wrote:
>> I can handle the Go SDK change once the urn is decided. I'm cleaning up a 
>> change to add the combine_global urn in the Go SDK so this can slip in along 
>> side it.
>> 
>> On Mon, Oct 4, 2021, 3:57 PM Ke Wu <ke.wu...@gmail.com 
>> <mailto:ke.wu...@gmail.com>> wrote:
>> Created https://issues.apache.org/jira/browse/BEAM-12999 
>> <https://issues.apache.org/jira/browse/BEAM-12999> 
>> 
>>> On Oct 4, 2021, at 3:37 PM, Robert Bradshaw <rober...@google.com 
>>> <mailto:rober...@google.com>> wrote:
>>> 
>>> Thanks. Happy to help with Python/Go. Do you want to create a JIRA?
>>> 
>>> On Mon, Oct 4, 2021 at 3:33 PM Ke Wu <ke.wu...@gmail.com 
>>> <mailto:ke.wu...@gmail.com>> wrote:
>>>> 
>>>> Let me add two new urns representing reshuffle via random key and 
>>>> reshuffle using key. I will share the PR later here, would need some help 
>>>> on Python/Go SDK changes too since I am not very familiar with them.
>>>> 
>>>> Best,
>>>> Ke
>>>> 
>>>> 
>>>> On Oct 4, 2021, at 3:11 PM, Robert Bradshaw <rober...@google.com 
>>>> <mailto:rober...@google.com>> wrote:
>>>> 
>>>> On Mon, Oct 4, 2021 at 3:08 PM Jan Lukavský <je...@seznam.cz 
>>>> <mailto:je...@seznam.cz>> wrote:
>>>> 
>>>> 
>>>> On 10/4/21 11:43 PM, Robert Bradshaw wrote:
>>>> 
>>>> Oh, yes.
>>>> 
>>>> Java Reshuffle.of() = Python ReshufflePerKey()
>>>> Java Reshuffle.viaRandomKey() == Python Reshuffle()
>>>> 
>>>> We generally try to avoid this kind of discrepancy. It could make
>>>> sense to rename Reshuffle.of() to Reshuffle.viaKey().
>>>> 
>>>> 
>>>> I'd suggest Reshuffle.usingKey(), but I'm not native speaker, so that
>>>> might be opinionated.
>>>> 
>>>> 
>>>> usingKey does sound better. (And, FWIW, usingRandomKey() sounds better
>>>> to me than vaiRandomKey(), but probably not worth changing so the
>>>> question becomes whether to be stilted or consistent.)
>>>> 
>>>> More importantly - could we undeprecate Reshuffle
>>>> (in Java SDK)? I believe it was deprecated for wrong reasons - yes, it
>>>> has undocumented and non-portable side-effects, but is still makes sense
>>>> for various use-cases (e.g. fan-out, or SDF).
>>>> 
>>>> 
>>>> +1
>>>> 
>>>> 
>>>> On Mon, Oct 4, 2021 at 2:33 PM Ke Wu <ke.wu...@gmail.com 
>>>> <mailto:ke.wu...@gmail.com>> wrote:
>>>> 
>>>> I should have said that the descrepency lives in SDK not Class vs Portable.
>>>> 
>>>> Correct me if I am wrong, Reshuffle transform in Java SDK requires the 
>>>> input to be KV [1] while Reshuffle in Python [2] and Go [3] does not.
>>>> 
>>>> 
>>>> [1] 
>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53
>>>>  
>>>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53>
>>>> [2] 
>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>>>>  
>>>> <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730>
>>>> [3] 
>>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122 
>>>> <https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122>
>>>> 
>>>> On Oct 4, 2021, at 12:09 PM, Robert Bradshaw <rober...@google.com 
>>>> <mailto:rober...@google.com>> wrote:
>>>> 
>>>> Reshuffle is not keyed, there is a separate reshuffle-per-key for
>>>> that. This is true for both Java and Python. This shouldn't depend on
>>>> classic vs. portable mode. It sounds like there's an issue in
>>>> translation.
>>>> 
>>>> On Mon, Oct 4, 2021 at 11:18 AM Ke Wu <ke.wu...@gmail.com 
>>>> <mailto:ke.wu...@gmail.com>> wrote:
>>>> 
>>>> 
>>>> Hello All,
>>>> 
>>>> Recent Samza Runner tests failure in python/xlang [1][2] reveals an 
>>>> interesting fact that Reshuffle Transform in classic pipeline requires the 
>>>> input to be KV while portable pipeline does not, where Reshuffle in 
>>>> portable mode it has an extra step to append a random key [3].
>>>> 
>>>> This suggests that Reshuffle in classic mode is, sort of, equivalent to 
>>>> ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of 
>>>> questions on this:
>>>> 
>>>> 1. Is such SDK/API discrepancy expected?
>>>> 2. If Yes, then, what are the advised approach for runners to implement 
>>>> translators for such transforms?
>>>> 3. If No, is this something we can improve?
>>>> 
>>>> Best,
>>>> Ke
>>>> 
>>>> 
>>>> [1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/ 
>>>> <https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/>
>>>> [2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/ 
>>>> <https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/>
>>>> [3] 
>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>>>>  
>>>> <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730>
>>>> 
>>>> 
>> 
> 

Reply via email to