@Robert @Luke @Jan, could you help take an early look at https://github.com/apache/beam/pull/15665 <https://github.com/apache/beam/pull/15665> there are some test failures that are to be resolved, but most likely not related to the PR because I observes the same failures across PRs.
> On Oct 6, 2021, at 11:03 AM, Robert Burke <rob...@frantil.com> wrote: > > The GoSDK handles the urn as unkeyed. > > That is, reshuffling a PCollection<KV> will ignore the keys, and produce a > PCollection<KV<int,KV>> with the random keys. This would split user keys up > to multiple partitions. This is the same as though it were unkeyed. > > Doing anything with the user key specifically would seem to me to defeat the > point of a reshuffle, vs just using a GBK which would align keys to bundles > in it's output. > > > On Wed, Oct 6, 2021, 10:54 AM Ke Wu <ke.wu...@gmail.com > <mailto:ke.wu...@gmail.com>> wrote: >> The only usage of of the keyed Reshuffle in the Java SDK is for write files >> with a single key and the use case there would benefit from being replaced >> with GroupIntoBatches instead. > > I think there are more use cases for keyed reshuffle , e.g. in Samza runner, > it is also used when we rekeyed elements, in addition, since states are > partitioned by key, so it is important to reshuffle after a PCollection is > assigned with a different key so elements with the same new key could end up > in the same partition. > >> I believe the intent was for the existing reshuffle URN to represent the >> keyed variant. This was since the unkeyed reshuffle was a composite built on >> top of the keyed reshuffle in the Java SDK. The existing overrides in >> Flink/Spark/Samza confirm this. > > I believe so, because all Samza/Flink/Spark ’s Reshuffle translator are > authored in Java, which is expecting keyed Reshuffle<K, V>. > >> I believe the intent was for the existing reshuffle URN to represent the >> keyed variant. >> And from the Python side I thought the intent was for the reshuffle >> URN to represent the unkeyed variant, as the keyed one isn't anything >> novel > > > This is exactly what is confusing, the same urn is currently representing > keyed reshuffle in Java SDK but unkeyed reshuffle in Python SDK. > @Luke do you think it makes since to have two separately Urns representing > two different reshuffles? Unkeyed reshuffle is still expected to be a > composite transform of keyed transform and runners can decided which > (keyed/unkeyd) reshuffle they want to translate. > > Best, > Ke > >> On Oct 6, 2021, at 10:38 AM, Reuven Lax <re...@google.com >> <mailto:re...@google.com>> wrote: >> >> I think it's used with the destination as a key, no? In various places >> Reshuffle is used as a standin for RequiresStableInput >> >> On Wed, Oct 6, 2021 at 10:07 AM Luke Cwik <lc...@google.com >> <mailto:lc...@google.com>> wrote: >> I believe the intent was for the existing reshuffle URN to represent the >> keyed variant. This was since the unkeyed reshuffle was a composite built on >> top of the keyed reshuffle in the Java SDK. The existing overrides in >> Flink/Spark/Samza confirm this. >> >> Thinking about this more I wish we had went only with the unkeyed variant as >> I don't know how much benefit users get from having their records grouped by >> the key they choose and it also limits the optimization capabilities of the >> runner a lot as to how to materialize the data. >> >> The only usage of of the keyed Reshuffle in the Java SDK is for write files >> with a single key and the use case there would benefit from being replaced >> with GroupIntoBatches instead. >> >> >> On Mon, Oct 4, 2021 at 6:31 PM Robert Burke <rob...@frantil.com >> <mailto:rob...@frantil.com>> wrote: >> I can handle the Go SDK change once the urn is decided. I'm cleaning up a >> change to add the combine_global urn in the Go SDK so this can slip in along >> side it. >> >> On Mon, Oct 4, 2021, 3:57 PM Ke Wu <ke.wu...@gmail.com >> <mailto:ke.wu...@gmail.com>> wrote: >> Created https://issues.apache.org/jira/browse/BEAM-12999 >> <https://issues.apache.org/jira/browse/BEAM-12999> >> >>> On Oct 4, 2021, at 3:37 PM, Robert Bradshaw <rober...@google.com >>> <mailto:rober...@google.com>> wrote: >>> >>> Thanks. Happy to help with Python/Go. Do you want to create a JIRA? >>> >>> On Mon, Oct 4, 2021 at 3:33 PM Ke Wu <ke.wu...@gmail.com >>> <mailto:ke.wu...@gmail.com>> wrote: >>>> >>>> Let me add two new urns representing reshuffle via random key and >>>> reshuffle using key. I will share the PR later here, would need some help >>>> on Python/Go SDK changes too since I am not very familiar with them. >>>> >>>> Best, >>>> Ke >>>> >>>> >>>> On Oct 4, 2021, at 3:11 PM, Robert Bradshaw <rober...@google.com >>>> <mailto:rober...@google.com>> wrote: >>>> >>>> On Mon, Oct 4, 2021 at 3:08 PM Jan Lukavský <je...@seznam.cz >>>> <mailto:je...@seznam.cz>> wrote: >>>> >>>> >>>> On 10/4/21 11:43 PM, Robert Bradshaw wrote: >>>> >>>> Oh, yes. >>>> >>>> Java Reshuffle.of() = Python ReshufflePerKey() >>>> Java Reshuffle.viaRandomKey() == Python Reshuffle() >>>> >>>> We generally try to avoid this kind of discrepancy. It could make >>>> sense to rename Reshuffle.of() to Reshuffle.viaKey(). >>>> >>>> >>>> I'd suggest Reshuffle.usingKey(), but I'm not native speaker, so that >>>> might be opinionated. >>>> >>>> >>>> usingKey does sound better. (And, FWIW, usingRandomKey() sounds better >>>> to me than vaiRandomKey(), but probably not worth changing so the >>>> question becomes whether to be stilted or consistent.) >>>> >>>> More importantly - could we undeprecate Reshuffle >>>> (in Java SDK)? I believe it was deprecated for wrong reasons - yes, it >>>> has undocumented and non-portable side-effects, but is still makes sense >>>> for various use-cases (e.g. fan-out, or SDF). >>>> >>>> >>>> +1 >>>> >>>> >>>> On Mon, Oct 4, 2021 at 2:33 PM Ke Wu <ke.wu...@gmail.com >>>> <mailto:ke.wu...@gmail.com>> wrote: >>>> >>>> I should have said that the descrepency lives in SDK not Class vs Portable. >>>> >>>> Correct me if I am wrong, Reshuffle transform in Java SDK requires the >>>> input to be KV [1] while Reshuffle in Python [2] and Go [3] does not. >>>> >>>> >>>> [1] >>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53 >>>> >>>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53> >>>> [2] >>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730 >>>> >>>> <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730> >>>> [3] >>>> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122 >>>> <https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122> >>>> >>>> On Oct 4, 2021, at 12:09 PM, Robert Bradshaw <rober...@google.com >>>> <mailto:rober...@google.com>> wrote: >>>> >>>> Reshuffle is not keyed, there is a separate reshuffle-per-key for >>>> that. This is true for both Java and Python. This shouldn't depend on >>>> classic vs. portable mode. It sounds like there's an issue in >>>> translation. >>>> >>>> On Mon, Oct 4, 2021 at 11:18 AM Ke Wu <ke.wu...@gmail.com >>>> <mailto:ke.wu...@gmail.com>> wrote: >>>> >>>> >>>> Hello All, >>>> >>>> Recent Samza Runner tests failure in python/xlang [1][2] reveals an >>>> interesting fact that Reshuffle Transform in classic pipeline requires the >>>> input to be KV while portable pipeline does not, where Reshuffle in >>>> portable mode it has an extra step to append a random key [3]. >>>> >>>> This suggests that Reshuffle in classic mode is, sort of, equivalent to >>>> ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of >>>> questions on this: >>>> >>>> 1. Is such SDK/API discrepancy expected? >>>> 2. If Yes, then, what are the advised approach for runners to implement >>>> translators for such transforms? >>>> 3. If No, is this something we can improve? >>>> >>>> Best, >>>> Ke >>>> >>>> >>>> [1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/ >>>> <https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/> >>>> [2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/ >>>> <https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/> >>>> [3] >>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730 >>>> >>>> <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730> >>>> >>>> >> >