On Sat, May 19, 2018 at 10:55 PM Robert Bradshaw <rober...@google.com> wrote:
> On Sat, May 19, 2018 at 6:27 PM Raghu Angadi <rang...@google.com> wrote: > >> On Sat, May 19, 2018 at 8:11 AM Robert Bradshaw <rober...@google.com> >> wrote: >> >>> On Fri, May 18, 2018 at 6:29 PM Raghu Angadi <rang...@google.com> wrote: >>> >>>> True. I am still failing to see what is broken about Reshuffle that is >>>> also not broken with GroupByKey transform. If someone depends on GroupByKey >>>> to get stable input, isn't that equally incorrect/unportable? >>>> >>> >>> Yes, if people use GBK in that way, it's also just as broken. The >>> thought is that fewer people would use it with that intent, as GBK is not a >>> no-op (it transforms the shape of the data, and also does not preserve >>> windowing). This is in contrast to Reshuffle which was encouraged for this >>> usecase. >>> >> >> I see. I am not aware of any recommendation for users (excluding advanced >> users) to use this for stable input/durability gaurantees. Every single >> case where I recommended Reshuffle was related to parallelism (there were >> many such cases). Most of use of Reshuflle/GBK for stable input were done >> consciously by the authors, fully aware of the caveats (SDF in Dataflow, >> Kafka EOS sink use of GBK, etc). >> >> As a result, deprecation is only hurting the innocent users who are using >> Reshuffle correctly. >> >> I think it would be much more user friendly to un-deprecate it to add a >> warning for advanced users about non-portability of durability/replay >> guarantees/stable input assumptions. >> >>> > Yes, I think everyone in this thread is in agreement here. We should > provide a *different* transform that provides the durability guarantees > (with caveats). In the meantime, this delegating to a reshuffle would be > better than using a reshuffle directly. > The different thing has already been discussed on the dev thread - it's the RequresStableInput property on a DoFn's process method. The annotation has even been added already (for Java), it just has never been hooked up. > > >> We tend to put in reshuffles in order to "commit" these random values and >>>>>>> make them stable for the next stage, to be used to provide the needed >>>>>>> idempotency for sinks. >>>>>>> >>>>>> >>>>>> In such cases, I think the author should error out on the runner that >>>>>> don't provide that guarantee. That is what ExactlyOnceSink in KafkaIO >>>>>> does >>>>>> [1]. >>>>>> >>>>>> [1] >>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1049 >>>>>> >>>>> >>>>> We're moving to a world where the runner may not be known at pipeline >>>>> construction time. However, explicitly using a (distinct) >>>>> make-input-stable >>>>> transform when that's the intent (which could be a primitive that runners >>>>> should implement, possibly by swapping in Reshuffle, or reject) would >>>>> allow >>>>> for this. That being said, the exact semantics of this transform is a bit >>>>> of a rabbit hole which is why we never finished the job of deprecating >>>>> Reshuffle. This is a case where doing something is better than doing >>>>> nothing, and our use of URNs for this kind of thing is flexible enough >>>>> that >>>>> we can deprecate old ones if/when we have time to pound out the right >>>>> solution. >>>>> >>>>> >>>>>> >>>>>>> Kenn >>>>>>> >>>>>>> On Fri, May 18, 2018 at 4:05 PM Raghu Angadi <rang...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> >>>>>>>> On Fri, May 18, 2018 at 12:21 PM Robert Bradshaw < >>>>>>>> rober...@google.com> wrote: >>>>>>>> >>>>>>>>> On Fri, May 18, 2018 at 11:46 AM Raghu Angadi <rang...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thanks Kenn. >>>>>>>>>> >>>>>>>>>> On Fri, May 18, 2018 at 11:02 AM Kenneth Knowles <k...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> The fact that its usage has grown probably indicates that we >>>>>>>>>>> have a large number of transforms that can easily cause data loss / >>>>>>>>>>> duplication. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Is this specific to Reshuffle or it is true for any GroupByKey? I >>>>>>>>>> see Reshuffle as just a wrapper around GBK. >>>>>>>>>> >>>>>>>>> The issue is when it's used in such a way that data corruption can >>>>>>>>> occur when the underlying GBK output is not stable. >>>>>>>>> >>>>>>>> >>>>>>>> Could you describe this breakage bit more in detail or give a >>>>>>>> example? Apologies in advance, I know this came up in multiple >>>>>>>> contexts in >>>>>>>> the past, but I haven't grokked the issue well. It is the window >>>>>>>> rewrite >>>>>>>> that Reshuffle does that causes misuse of GBK? >>>>>>>> >>>>>>>> Thanks. >>>>>>>> >>>>>>>