On Sat, May 19, 2018 at 10:55 PM Robert Bradshaw
wrote:
> On Sat, May 19, 2018 at 6:27 PM Raghu Angadi wrote:
>
>> On Sat, May 19, 2018 at 8:11 AM Robert Bradshaw
>> wrote:
>>
>>> On Fri, May 18, 2018 at 6:29 PM Raghu Angadi wrote:
>>>
True. I am still failing to see what is broken about Reshuffle that is
also not broken with GroupByKey transform. If someone depends on GroupByKey
to get stable input, isn't that equally incorrect/unportable?
>>>
>>> Yes, if people use GBK in that way, it's also just as broken. The
>>> thought is that fewer people would use it with that intent, as GBK is not a
>>> no-op (it transforms the shape of the data, and also does not preserve
>>> windowing). This is in contrast to Reshuffle which was encouraged for this
>>> usecase.
>>>
>>
>> I see. I am not aware of any recommendation for users (excluding advanced
>> users) to use this for stable input/durability gaurantees. Every single
>> case where I recommended Reshuffle was related to parallelism (there were
>> many such cases). Most of use of Reshuflle/GBK for stable input were done
>> consciously by the authors, fully aware of the caveats (SDF in Dataflow,
>> Kafka EOS sink use of GBK, etc).
>>
>> As a result, deprecation is only hurting the innocent users who are using
>> Reshuffle correctly.
>>
>> I think it would be much more user friendly to un-deprecate it to add a
>> warning for advanced users about non-portability of durability/replay
>> guarantees/stable input assumptions.
>>
>>>
> Yes, I think everyone in this thread is in agreement here. We should
> provide a *different* transform that provides the durability guarantees
> (with caveats). In the meantime, this delegating to a reshuffle would be
> better than using a reshuffle directly.
>
The different thing has already been discussed on the dev thread - it's the
RequresStableInput property on a DoFn's process method. The annotation has
even been added already (for Java), it just has never been hooked up.
>
>
>> We tend to put in reshuffles in order to "commit" these random values and
>>> make them stable for the next stage, to be used to provide the needed
>>> idempotency for sinks.
>>>
>>
>> In such cases, I think the author should error out on the runner that
>> don't provide that guarantee. That is what ExactlyOnceSink in KafkaIO
>> does
>> [1].
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1049
>>
>
> We're moving to a world where the runner may not be known at pipeline
> construction time. However, explicitly using a (distinct)
> make-input-stable
> transform when that's the intent (which could be a primitive that runners
> should implement, possibly by swapping in Reshuffle, or reject) would
> allow
> for this. That being said, the exact semantics of this transform is a bit
> of a rabbit hole which is why we never finished the job of deprecating
> Reshuffle. This is a case where doing something is better than doing
> nothing, and our use of URNs for this kind of thing is flexible enough
> that
> we can deprecate old ones if/when we have time to pound out the right
> solution.
>
>
>>
>>> Kenn
>>>
>>> On Fri, May 18, 2018 at 4:05 PM Raghu Angadi
>>> wrote:
>>>
On Fri, May 18, 2018 at 12:21 PM Robert Bradshaw <
rober...@google.com> wrote:
> On Fri, May 18, 2018 at 11:46 AM Raghu Angadi
> wrote:
>
>> Thanks Kenn.
>>
>> On Fri, May 18, 2018 at 11:02 AM Kenneth Knowles
>> wrote:
>>
>>> The fact that its usage has grown probably indicates that we
>>> have a large number of transforms that can easily cause data loss /
>>> duplication.
>>>
>>
>> Is this specific to Reshuffle or it is true for any GroupByKey? I
>> see Reshuffle as just a wrapper around GBK.
>>
> The issue is when it's used in such a way that data corruption can
> occur when the underlying GBK output is not stable.
>
Could you describe this breakage bit more in detail or give a
example? Apologies in advance, I know this came up in multiple
contexts in
the past, but I haven't grokked the issue well. It is the window
rewrite
that Reshuffle does that causes misuse of GBK?
Thanks.
>>>