Hi,

I have a question about @RequiresStableInput functionality. I'm trying to make it work for portable Flink runner [1], [2]. We have an integration test (which should probably be turned into Validates runner test, but that is a different story) [3]. The test creates random key for input element, processes it once, fails the pipeline and then reprocesses it. This works well provided there is a checkpoint (shuffle in case of dataflow) exactly between assigning random key (via PairWithRandomKeyFn) and processing it with (via MakeSideEffectAndThenFailFn), this works well.

The problem is that GreedyPipelineFuser fuses the transform PairWithRandomKeyFn andMakeSideEffectAndThenFailFn into single ExecutableStage. This is then executed with the @RequiresStableInput requirement, but this obviously assigns a different key to the reprocessed element(s). This looks like we need to fix that in the PipelineFuser, is this right? Does this mean the @RequiresStableInput functionality is actually broken for all runners that use the default fusion?

Another possibility is that we need to fix test by adding an explicit reshuffle (verified, this works), but I think that the test is actually correct, users would probably not expect transforms to be fused when crossing the @RequiresStableInput boundary.

Thoughts?

 Jan


[1] https://github.com/apache/beam/issues/20812
[2] https://github.com/apache/beam/pull/22889
[3] https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java

Reply via email to