Re: What is the future of Reshuffle?
On Sat, May 19, 2018 at 6:27 PM Raghu Angadi wrote: > On Sat, May 19, 2018 at 8:11 AM Robert Bradshaw > wrote: > >> On Fri, May 18, 2018 at 6:29 PM Raghu Angadi wrote: >> >>> True. I am still failing to see what is broken about Reshuffle that is >>> also not broken with GroupByKey transform. If someone depends on GroupByKey >>> to get stable input, isn't that equally incorrect/unportable? >>> >> >> Yes, if people use GBK in that way, it's also just as broken. The thought >> is that fewer people would use it with that intent, as GBK is not a no-op >> (it transforms the shape of the data, and also does not preserve >> windowing). This is in contrast to Reshuffle which was encouraged for this >> usecase. >> > > I see. I am not aware of any recommendation for users (excluding advanced > users) to use this for stable input/durability gaurantees. Every single > case where I recommended Reshuffle was related to parallelism (there were > many such cases). Most of use of Reshuflle/GBK for stable input were done > consciously by the authors, fully aware of the caveats (SDF in Dataflow, > Kafka EOS sink use of GBK, etc). > > As a result, deprecation is only hurting the innocent users who are using > Reshuffle correctly. > > I think it would be much more user friendly to un-deprecate it to add a > warning for advanced users about non-portability of durability/replay > guarantees/stable input assumptions. > >> Yes, I think everyone in this thread is in agreement here. We should provide a *different* transform that provides the durability guarantees (with caveats). In the meantime, this delegating to a reshuffle would be better than using a reshuffle directly. > We tend to put in reshuffles in order to "commit" these random values and >> make them stable for the next stage, to be used to provide the needed >> idempotency for sinks. >> > > In such cases, I think the author should error out on the runner that > don't provide that guarantee. That is what ExactlyOnceSink in KafkaIO does > [1]. > > [1] > https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1049 > We're moving to a world where the runner may not be known at pipeline construction time. However, explicitly using a (distinct) make-input-stable transform when that's the intent (which could be a primitive that runners should implement, possibly by swapping in Reshuffle, or reject) would allow for this. That being said, the exact semantics of this transform is a bit of a rabbit hole which is why we never finished the job of deprecating Reshuffle. This is a case where doing something is better than doing nothing, and our use of URNs for this kind of thing is flexible enough that we can deprecate old ones if/when we have time to pound out the right solution. > >> Kenn >> >> On Fri, May 18, 2018 at 4:05 PM Raghu Angadi >> wrote: >> >>> >>> On Fri, May 18, 2018 at 12:21 PM Robert Bradshaw < >>> rober...@google.com> wrote: >>> On Fri, May 18, 2018 at 11:46 AM Raghu Angadi wrote: > Thanks Kenn. > > On Fri, May 18, 2018 at 11:02 AM Kenneth Knowles > wrote: > >> The fact that its usage has grown probably indicates that we have >> a large number of transforms that can easily cause data loss / >> duplication. >> > > Is this specific to Reshuffle or it is true for any GroupByKey? I > see Reshuffle as just a wrapper around GBK. > The issue is when it's used in such a way that data corruption can occur when the underlying GBK output is not stable. >>> >>> Could you describe this breakage bit more in detail or give a >>> example? Apologies in advance, I know this came up in multiple contexts >>> in >>> the past, but I haven't grokked the issue well. It is the window rewrite >>> that Reshuffle does that causes misuse of GBK? >>> >>> Thanks. >>> >>
Re: What is the future of Reshuffle?
On Sat, May 19, 2018 at 8:11 AM Robert Bradshaw wrote: > On Fri, May 18, 2018 at 6:29 PM Raghu Angadi wrote: > >> True. I am still failing to see what is broken about Reshuffle that is >> also not broken with GroupByKey transform. If someone depends on GroupByKey >> to get stable input, isn't that equally incorrect/unportable? >> > > Yes, if people use GBK in that way, it's also just as broken. The thought > is that fewer people would use it with that intent, as GBK is not a no-op > (it transforms the shape of the data, and also does not preserve > windowing). This is in contrast to Reshuffle which was encouraged for this > usecase. > I see. I am not aware of any recommendation for users (excluding advanced users) to use this for stable input/durability gaurantees. Every single case where I recommended Reshuffle was related to parallelism (there were many such cases). Most of use of Reshuflle/GBK for stable input were done consciously by the authors, fully aware of the caveats (SDF in Dataflow, Kafka EOS sink use of GBK, etc). As a result, deprecation is only hurting the innocent users who are using Reshuffle correctly. I think it would be much more user friendly to un-deprecate it to add a warning for advanced users about non-portability of durability/replay guarantees/stable input assumptions. Raghu. We tend to put in reshuffles in order to "commit" these random values and > make them stable for the next stage, to be used to provide the needed > idempotency for sinks. > In such cases, I think the author should error out on the runner that don't provide that guarantee. That is what ExactlyOnceSink in KafkaIO does [1]. [1] https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1049 >>> >>> We're moving to a world where the runner may not be known at pipeline >>> construction time. However, explicitly using a (distinct) make-input-stable >>> transform when that's the intent (which could be a primitive that runners >>> should implement, possibly by swapping in Reshuffle, or reject) would allow >>> for this. That being said, the exact semantics of this transform is a bit >>> of a rabbit hole which is why we never finished the job of deprecating >>> Reshuffle. This is a case where doing something is better than doing >>> nothing, and our use of URNs for this kind of thing is flexible enough that >>> we can deprecate old ones if/when we have time to pound out the right >>> solution. >>> >>> > Kenn > > On Fri, May 18, 2018 at 4:05 PM Raghu Angadi > wrote: > >> >> On Fri, May 18, 2018 at 12:21 PM Robert Bradshaw >> wrote: >> >>> On Fri, May 18, 2018 at 11:46 AM Raghu Angadi >>> wrote: >>> Thanks Kenn. On Fri, May 18, 2018 at 11:02 AM Kenneth Knowles wrote: > The fact that its usage has grown probably indicates that we have > a large number of transforms that can easily cause data loss / > duplication. > Is this specific to Reshuffle or it is true for any GroupByKey? I see Reshuffle as just a wrapper around GBK. >>> The issue is when it's used in such a way that data corruption can >>> occur when the underlying GBK output is not stable. >>> >> >> Could you describe this breakage bit more in detail or give a >> example? Apologies in advance, I know this came up in multiple contexts >> in >> the past, but I haven't grokked the issue well. It is the window rewrite >> that Reshuffle does that causes misuse of GBK? >> >> Thanks. >> >
Re: What is the future of Reshuffle?
On Fri, May 18, 2018 at 6:29 PM Raghu Angadi wrote: > > On Fri, May 18, 2018 at 5:34 PM Robert Bradshaw > wrote: > >> Ah, thanks, that makes sense. That implies to me Reshuffle is no more >>> broken than GBK itself. May be Reshuffle.viaRandomKey() could have a clear >>> caveat. Reshuffle's JavaDoc could add a caveat too about non-deterministic >>> keys and retries (though it applies to GroupByKey in general). >>> >> >> The "randomness" of Reshuffle.viaRandomKey() is fine, as the randomly >> generated key is never exposed to the users (so it doesn't matter if it >> changes). >> > > Agreed. > > >> Reshuffle is broken if you are using it to get stable input on a runner >> that doesn't always have stable input as an implementation detail of GBKs. >> > > True. I am still failing to see what is broken about Reshuffle that is > also not broken with GroupByKey transform. If someone depends on GroupByKey > to get stable input, isn't that equally incorrect/unportable? > Yes, if people use GBK in that way, it's also just as broken. The thought is that fewer people would use it with that intent, as GBK is not a no-op (it transforms the shape of the data, and also does not preserve windowing). This is in contrast to Reshuffle which was encouraged for this usecase. > > Raghu. > >> >> We tend to put in reshuffles in order to "commit" these random values and make them stable for the next stage, to be used to provide the needed idempotency for sinks. >>> >>> In such cases, I think the author should error out on the runner that >>> don't provide that guarantee. That is what ExactlyOnceSink in KafkaIO does >>> [1]. >>> >>> [1] >>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1049 >>> >> >> We're moving to a world where the runner may not be known at pipeline >> construction time. However, explicitly using a (distinct) make-input-stable >> transform when that's the intent (which could be a primitive that runners >> should implement, possibly by swapping in Reshuffle, or reject) would allow >> for this. That being said, the exact semantics of this transform is a bit >> of a rabbit hole which is why we never finished the job of deprecating >> Reshuffle. This is a case where doing something is better than doing >> nothing, and our use of URNs for this kind of thing is flexible enough that >> we can deprecate old ones if/when we have time to pound out the right >> solution. >> >> >>> Kenn On Fri, May 18, 2018 at 4:05 PM Raghu Angadi wrote: > > On Fri, May 18, 2018 at 12:21 PM Robert Bradshaw > wrote: > >> On Fri, May 18, 2018 at 11:46 AM Raghu Angadi >> wrote: >> >>> Thanks Kenn. >>> >>> On Fri, May 18, 2018 at 11:02 AM Kenneth Knowles >>> wrote: >>> The fact that its usage has grown probably indicates that we have a large number of transforms that can easily cause data loss / duplication. >>> >>> Is this specific to Reshuffle or it is true for any GroupByKey? I >>> see Reshuffle as just a wrapper around GBK. >>> >> The issue is when it's used in such a way that data corruption can >> occur when the underlying GBK output is not stable. >> > > Could you describe this breakage bit more in detail or give a example? > Apologies in advance, I know this came up in multiple contexts in the > past, > but I haven't grokked the issue well. It is the window rewrite that > Reshuffle does that causes misuse of GBK? > > Thanks. >