> The only usage of of the keyed Reshuffle in the Java SDK is for write files > with a single key and the use case there would benefit from being replaced > with GroupIntoBatches instead.
I think there are more use cases for keyed reshuffle , e.g. in Samza runner, it is also used when we rekeyed elements, in addition, since states are partitioned by key, so it is important to reshuffle after a PCollection is assigned with a different key so elements with the same new key could end up in the same partition. > I believe the intent was for the existing reshuffle URN to represent the > keyed variant. This was since the unkeyed reshuffle was a composite built on > top of the keyed reshuffle in the Java SDK. The existing overrides in > Flink/Spark/Samza confirm this. I believe so, because all Samza/Flink/Spark ’s Reshuffle translator are authored in Java, which is expecting keyed Reshuffle<K, V>. > I believe the intent was for the existing reshuffle URN to represent the > keyed variant. > And from the Python side I thought the intent was for the reshuffle > URN to represent the unkeyed variant, as the keyed one isn't anything > novel This is exactly what is confusing, the same urn is currently representing keyed reshuffle in Java SDK but unkeyed reshuffle in Python SDK. @Luke do you think it makes since to have two separately Urns representing two different reshuffles? Unkeyed reshuffle is still expected to be a composite transform of keyed transform and runners can decided which (keyed/unkeyd) reshuffle they want to translate. Best, Ke > On Oct 6, 2021, at 10:38 AM, Reuven Lax <re...@google.com> wrote: > > I think it's used with the destination as a key, no? In various places > Reshuffle is used as a standin for RequiresStableInput > > On Wed, Oct 6, 2021 at 10:07 AM Luke Cwik <lc...@google.com > <mailto:lc...@google.com>> wrote: > I believe the intent was for the existing reshuffle URN to represent the > keyed variant. This was since the unkeyed reshuffle was a composite built on > top of the keyed reshuffle in the Java SDK. The existing overrides in > Flink/Spark/Samza confirm this. > > Thinking about this more I wish we had went only with the unkeyed variant as > I don't know how much benefit users get from having their records grouped by > the key they choose and it also limits the optimization capabilities of the > runner a lot as to how to materialize the data. > > The only usage of of the keyed Reshuffle in the Java SDK is for write files > with a single key and the use case there would benefit from being replaced > with GroupIntoBatches instead. > > > On Mon, Oct 4, 2021 at 6:31 PM Robert Burke <rob...@frantil.com > <mailto:rob...@frantil.com>> wrote: > I can handle the Go SDK change once the urn is decided. I'm cleaning up a > change to add the combine_global urn in the Go SDK so this can slip in along > side it. > > On Mon, Oct 4, 2021, 3:57 PM Ke Wu <ke.wu...@gmail.com > <mailto:ke.wu...@gmail.com>> wrote: > Created https://issues.apache.org/jira/browse/BEAM-12999 > <https://issues.apache.org/jira/browse/BEAM-12999> > >> On Oct 4, 2021, at 3:37 PM, Robert Bradshaw <rober...@google.com >> <mailto:rober...@google.com>> wrote: >> >> Thanks. Happy to help with Python/Go. Do you want to create a JIRA? >> >> On Mon, Oct 4, 2021 at 3:33 PM Ke Wu <ke.wu...@gmail.com >> <mailto:ke.wu...@gmail.com>> wrote: >>> >>> Let me add two new urns representing reshuffle via random key and reshuffle >>> using key. I will share the PR later here, would need some help on >>> Python/Go SDK changes too since I am not very familiar with them. >>> >>> Best, >>> Ke >>> >>> >>> On Oct 4, 2021, at 3:11 PM, Robert Bradshaw <rober...@google.com >>> <mailto:rober...@google.com>> wrote: >>> >>> On Mon, Oct 4, 2021 at 3:08 PM Jan Lukavský <je...@seznam.cz >>> <mailto:je...@seznam.cz>> wrote: >>> >>> >>> On 10/4/21 11:43 PM, Robert Bradshaw wrote: >>> >>> Oh, yes. >>> >>> Java Reshuffle.of() = Python ReshufflePerKey() >>> Java Reshuffle.viaRandomKey() == Python Reshuffle() >>> >>> We generally try to avoid this kind of discrepancy. It could make >>> sense to rename Reshuffle.of() to Reshuffle.viaKey(). >>> >>> >>> I'd suggest Reshuffle.usingKey(), but I'm not native speaker, so that >>> might be opinionated. >>> >>> >>> usingKey does sound better. (And, FWIW, usingRandomKey() sounds better >>> to me than vaiRandomKey(), but probably not worth changing so the >>> question becomes whether to be stilted or consistent.) >>> >>> More importantly - could we undeprecate Reshuffle >>> (in Java SDK)? I believe it was deprecated for wrong reasons - yes, it >>> has undocumented and non-portable side-effects, but is still makes sense >>> for various use-cases (e.g. fan-out, or SDF). >>> >>> >>> +1 >>> >>> >>> On Mon, Oct 4, 2021 at 2:33 PM Ke Wu <ke.wu...@gmail.com >>> <mailto:ke.wu...@gmail.com>> wrote: >>> >>> I should have said that the descrepency lives in SDK not Class vs Portable. >>> >>> Correct me if I am wrong, Reshuffle transform in Java SDK requires the >>> input to be KV [1] while Reshuffle in Python [2] and Go [3] does not. >>> >>> >>> [1] >>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53 >>> >>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53> >>> [2] >>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730 >>> >>> <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730> >>> [3] https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122 >>> <https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122> >>> >>> On Oct 4, 2021, at 12:09 PM, Robert Bradshaw <rober...@google.com >>> <mailto:rober...@google.com>> wrote: >>> >>> Reshuffle is not keyed, there is a separate reshuffle-per-key for >>> that. This is true for both Java and Python. This shouldn't depend on >>> classic vs. portable mode. It sounds like there's an issue in >>> translation. >>> >>> On Mon, Oct 4, 2021 at 11:18 AM Ke Wu <ke.wu...@gmail.com >>> <mailto:ke.wu...@gmail.com>> wrote: >>> >>> >>> Hello All, >>> >>> Recent Samza Runner tests failure in python/xlang [1][2] reveals an >>> interesting fact that Reshuffle Transform in classic pipeline requires the >>> input to be KV while portable pipeline does not, where Reshuffle in >>> portable mode it has an extra step to append a random key [3]. >>> >>> This suggests that Reshuffle in classic mode is, sort of, equivalent to >>> ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of >>> questions on this: >>> >>> 1. Is such SDK/API discrepancy expected? >>> 2. If Yes, then, what are the advised approach for runners to implement >>> translators for such transforms? >>> 3. If No, is this something we can improve? >>> >>> Best, >>> Ke >>> >>> >>> [1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/ >>> <https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/> >>> [2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/ >>> <https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/> >>> [3] >>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730 >>> >>> <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730> >>> >>> >