Filled https://github.com/apache/beam/issues/24655.
Jan
On 12/14/22 00:52, Luke Cwik via dev wrote:
This is definitely not working for portable pipelines since the
GreedyPipelineFuser doesn't create a fusion boundary which as you
pointed out causes a single stage that has a non-deterministic
function followed by one that requires stable input. It seems as
though we should have runners check the requirements on the
Pipeline[1] to ensure that they can faithfully process such a pipeline
and reject anything they don't support early on.
Making the GreedyPipelineFuser insert that fusion break is likely the
way to go. Runners should be able to look at the ParDoPayload
requires_stable_input field for the ExecutableStage to see if any
special handling is necessary on their end before they pass data to
that stage.
[1]:
https://github.com/apache/beam/blob/77af3237521d94f0399ab405ebac09bbbeded38c/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L111
On Tue, Dec 13, 2022 at 1:44 AM Jan Lukavský <je...@seznam.cz> wrote:
Hi,
I have a question about @RequiresStableInput functionality. I'm
trying to make it work for portable Flink runner [1], [2]. We have
an integration test (which should probably be turned into
Validates runner test, but that is a different story) [3]. The
test creates random key for input element, processes it once,
fails the pipeline and then reprocesses it. This works well
provided there is a checkpoint (shuffle in case of dataflow)
exactly between assigning random key (via PairWithRandomKeyFn) and
processing it with (via MakeSideEffectAndThenFailFn), this works well.
The problem is that GreedyPipelineFuser fuses the transform
PairWithRandomKeyFn andMakeSideEffectAndThenFailFn into single
ExecutableStage. This is then executed with the
@RequiresStableInput requirement, but this obviously assigns a
different key to the reprocessed element(s). This looks like we
need to fix that in the PipelineFuser, is this right? Does this
mean the @RequiresStableInput functionality is actually broken for
all runners that use the default fusion?
Another possibility is that we need to fix test by adding an
explicit reshuffle (verified, this works), but I think that the
test is actually correct, users would probably not expect
transforms to be fused when crossing the @RequiresStableInput
boundary.
Thoughts?
Jan
[1] https://github.com/apache/beam/issues/20812
[2] https://github.com/apache/beam/pull/22889
[3]
https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java