Hi,
I have a question about @RequiresStableInput functionality. I'm trying
to make it work for portable Flink runner [1], [2]. We have an
integration test (which should probably be turned into Validates runner
test, but that is a different story) [3]. The test creates random key
for input element, processes it once, fails the pipeline and then
reprocesses it. This works well provided there is a checkpoint (shuffle
in case of dataflow) exactly between assigning random key (via
PairWithRandomKeyFn) and processing it with (via
MakeSideEffectAndThenFailFn), this works well.
The problem is that GreedyPipelineFuser fuses the transform
PairWithRandomKeyFn andMakeSideEffectAndThenFailFn into single
ExecutableStage. This is then executed with the @RequiresStableInput
requirement, but this obviously assigns a different key to the
reprocessed element(s). This looks like we need to fix that in the
PipelineFuser, is this right? Does this mean the @RequiresStableInput
functionality is actually broken for all runners that use the default
fusion?
Another possibility is that we need to fix test by adding an explicit
reshuffle (verified, this works), but I think that the test is actually
correct, users would probably not expect transforms to be fused when
crossing the @RequiresStableInput boundary.
Thoughts?
Jan
[1] https://github.com/apache/beam/issues/20812
[2] https://github.com/apache/beam/pull/22889
[3]
https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java