> The only usage of of the keyed Reshuffle in the Java SDK is for write files 
> with a single key and the use case there would benefit from being replaced 
> with GroupIntoBatches instead.

I think there are more use cases for keyed reshuffle , e.g. in Samza runner, it 
is also used when we rekeyed elements, in addition, since states are 
partitioned by key, so it is important to reshuffle after a PCollection is 
assigned with a different key so elements with the same new key could end up in 
the same partition.

> I believe the intent was for the existing reshuffle URN to represent the 
> keyed variant. This was since the unkeyed reshuffle was a composite built on 
> top of the keyed reshuffle in the Java SDK. The existing overrides in 
> Flink/Spark/Samza confirm this.

I believe so, because all Samza/Flink/Spark ’s Reshuffle translator are 
authored in Java, which is expecting keyed Reshuffle<K, V>.

> I believe the intent was for the existing reshuffle URN to represent the 
> keyed variant.
> And from the Python side I thought the intent was for the reshuffle
> URN to represent the unkeyed variant, as the keyed one isn't anything
> novel


This is exactly what is confusing, the same urn is currently representing keyed 
reshuffle in Java SDK but unkeyed reshuffle in Python SDK. 
@Luke do you think it makes since to have two separately Urns representing two 
different reshuffles? Unkeyed reshuffle is still expected to be a composite 
transform of keyed transform and runners can decided which (keyed/unkeyd) 
reshuffle they want to translate.

Best,
Ke

> On Oct 6, 2021, at 10:38 AM, Reuven Lax <re...@google.com> wrote:
> 
> I think it's used with the destination as a key, no? In various places 
> Reshuffle is used as a standin for RequiresStableInput
> 
> On Wed, Oct 6, 2021 at 10:07 AM Luke Cwik <lc...@google.com 
> <mailto:lc...@google.com>> wrote:
> I believe the intent was for the existing reshuffle URN to represent the 
> keyed variant. This was since the unkeyed reshuffle was a composite built on 
> top of the keyed reshuffle in the Java SDK. The existing overrides in 
> Flink/Spark/Samza confirm this.
> 
> Thinking about this more I wish we had went only with the unkeyed variant as 
> I don't know how much benefit users get from having their records grouped by 
> the key they choose and it also limits the optimization capabilities of the 
> runner a lot as to how to materialize the data.
> 
> The only usage of of the keyed Reshuffle in the Java SDK is for write files 
> with a single key and the use case there would benefit from being replaced 
> with GroupIntoBatches instead.
> 
> 
> On Mon, Oct 4, 2021 at 6:31 PM Robert Burke <rob...@frantil.com 
> <mailto:rob...@frantil.com>> wrote:
> I can handle the Go SDK change once the urn is decided. I'm cleaning up a 
> change to add the combine_global urn in the Go SDK so this can slip in along 
> side it.
> 
> On Mon, Oct 4, 2021, 3:57 PM Ke Wu <ke.wu...@gmail.com 
> <mailto:ke.wu...@gmail.com>> wrote:
> Created https://issues.apache.org/jira/browse/BEAM-12999 
> <https://issues.apache.org/jira/browse/BEAM-12999> 
> 
>> On Oct 4, 2021, at 3:37 PM, Robert Bradshaw <rober...@google.com 
>> <mailto:rober...@google.com>> wrote:
>> 
>> Thanks. Happy to help with Python/Go. Do you want to create a JIRA?
>> 
>> On Mon, Oct 4, 2021 at 3:33 PM Ke Wu <ke.wu...@gmail.com 
>> <mailto:ke.wu...@gmail.com>> wrote:
>>> 
>>> Let me add two new urns representing reshuffle via random key and reshuffle 
>>> using key. I will share the PR later here, would need some help on 
>>> Python/Go SDK changes too since I am not very familiar with them.
>>> 
>>> Best,
>>> Ke
>>> 
>>> 
>>> On Oct 4, 2021, at 3:11 PM, Robert Bradshaw <rober...@google.com 
>>> <mailto:rober...@google.com>> wrote:
>>> 
>>> On Mon, Oct 4, 2021 at 3:08 PM Jan Lukavský <je...@seznam.cz 
>>> <mailto:je...@seznam.cz>> wrote:
>>> 
>>> 
>>> On 10/4/21 11:43 PM, Robert Bradshaw wrote:
>>> 
>>> Oh, yes.
>>> 
>>> Java Reshuffle.of() = Python ReshufflePerKey()
>>> Java Reshuffle.viaRandomKey() == Python Reshuffle()
>>> 
>>> We generally try to avoid this kind of discrepancy. It could make
>>> sense to rename Reshuffle.of() to Reshuffle.viaKey().
>>> 
>>> 
>>> I'd suggest Reshuffle.usingKey(), but I'm not native speaker, so that
>>> might be opinionated.
>>> 
>>> 
>>> usingKey does sound better. (And, FWIW, usingRandomKey() sounds better
>>> to me than vaiRandomKey(), but probably not worth changing so the
>>> question becomes whether to be stilted or consistent.)
>>> 
>>> More importantly - could we undeprecate Reshuffle
>>> (in Java SDK)? I believe it was deprecated for wrong reasons - yes, it
>>> has undocumented and non-portable side-effects, but is still makes sense
>>> for various use-cases (e.g. fan-out, or SDF).
>>> 
>>> 
>>> +1
>>> 
>>> 
>>> On Mon, Oct 4, 2021 at 2:33 PM Ke Wu <ke.wu...@gmail.com 
>>> <mailto:ke.wu...@gmail.com>> wrote:
>>> 
>>> I should have said that the descrepency lives in SDK not Class vs Portable.
>>> 
>>> Correct me if I am wrong, Reshuffle transform in Java SDK requires the 
>>> input to be KV [1] while Reshuffle in Python [2] and Go [3] does not.
>>> 
>>> 
>>> [1] 
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53
>>>  
>>> <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L53>
>>> [2] 
>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>>>  
>>> <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730>
>>> [3] https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122 
>>> <https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/gbk.go#L122>
>>> 
>>> On Oct 4, 2021, at 12:09 PM, Robert Bradshaw <rober...@google.com 
>>> <mailto:rober...@google.com>> wrote:
>>> 
>>> Reshuffle is not keyed, there is a separate reshuffle-per-key for
>>> that. This is true for both Java and Python. This shouldn't depend on
>>> classic vs. portable mode. It sounds like there's an issue in
>>> translation.
>>> 
>>> On Mon, Oct 4, 2021 at 11:18 AM Ke Wu <ke.wu...@gmail.com 
>>> <mailto:ke.wu...@gmail.com>> wrote:
>>> 
>>> 
>>> Hello All,
>>> 
>>> Recent Samza Runner tests failure in python/xlang [1][2] reveals an 
>>> interesting fact that Reshuffle Transform in classic pipeline requires the 
>>> input to be KV while portable pipeline does not, where Reshuffle in 
>>> portable mode it has an extra step to append a random key [3].
>>> 
>>> This suggests that Reshuffle in classic mode is, sort of, equivalent to 
>>> ReshufflePerKey in potable mode instead of Reshuffle itself. Couple of 
>>> questions on this:
>>> 
>>> 1. Is such SDK/API discrepancy expected?
>>> 2. If Yes, then, what are the advised approach for runners to implement 
>>> translators for such transforms?
>>> 3. If No, is this something we can improve?
>>> 
>>> Best,
>>> Ke
>>> 
>>> 
>>> [1] https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/ 
>>> <https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/288/>
>>> [2] https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/ 
>>> <https://ci-beam.apache.org/job/beam_PostCommit_XVR_Samza/285/>
>>> [3] 
>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730
>>>  
>>> <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/util.py#L730>
>>> 
>>> 
> 

Reply via email to