Filled https://github.com/apache/beam/issues/24655.

 Jan

On 12/14/22 00:52, Luke Cwik via dev wrote:
This is definitely not working for portable pipelines since the GreedyPipelineFuser doesn't create a fusion boundary which as you pointed out causes a single stage that has a non-deterministic function followed by one that requires stable input. It seems as though we should have runners check the requirements on the Pipeline[1] to ensure that they can faithfully process such a pipeline and reject anything they don't support early on.

Making the GreedyPipelineFuser insert that fusion break is likely the way to go. Runners should be able to look at the ParDoPayload requires_stable_input field for the ExecutableStage to see if any special handling is necessary on their end before they pass data to that stage.

[1]: https://github.com/apache/beam/blob/77af3237521d94f0399ab405ebac09bbbeded38c/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L111

On Tue, Dec 13, 2022 at 1:44 AM Jan Lukavský <je...@seznam.cz> wrote:

    Hi,

    I have a question about @RequiresStableInput functionality. I'm
    trying to make it work for portable Flink runner [1], [2]. We have
    an integration test (which should probably be turned into
    Validates runner test, but that is a different story) [3]. The
    test creates random key for input element, processes it once,
    fails the pipeline and then reprocesses it. This works well
    provided there is a checkpoint (shuffle in case of dataflow)
    exactly between assigning random key (via PairWithRandomKeyFn) and
    processing it with (via MakeSideEffectAndThenFailFn), this works well.

    The problem is that GreedyPipelineFuser fuses the transform
    PairWithRandomKeyFn andMakeSideEffectAndThenFailFn into single
    ExecutableStage. This is then executed with the
    @RequiresStableInput requirement, but this obviously assigns a
    different key to the reprocessed element(s). This looks like we
    need to fix that in the PipelineFuser, is this right? Does this
    mean the @RequiresStableInput functionality is actually broken for
    all runners that use the default fusion?

    Another possibility is that we need to fix test by adding an
    explicit reshuffle (verified, this works), but I think that the
    test is actually correct, users would probably not expect
    transforms to be fused when crossing the @RequiresStableInput
    boundary.

    Thoughts?

     Jan


    [1] https://github.com/apache/beam/issues/20812
    [2] https://github.com/apache/beam/pull/22889
    [3]
    
https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java

Reply via email to