Re: @RequiresStableInput and Pipeline fusion
Filled https://github.com/apache/beam/issues/24655. Jan On 12/14/22 00:52, Luke Cwik via dev wrote: This is definitely not working for portable pipelines since the GreedyPipelineFuser doesn't create a fusion boundary which as you pointed out causes a single stage that has a non-deterministic function followed by one that requires stable input. It seems as though we should have runners check the requirements on the Pipeline[1] to ensure that they can faithfully process such a pipeline and reject anything they don't support early on. Making the GreedyPipelineFuser insert that fusion break is likely the way to go. Runners should be able to look at the ParDoPayload requires_stable_input field for the ExecutableStage to see if any special handling is necessary on their end before they pass data to that stage. [1]: https://github.com/apache/beam/blob/77af3237521d94f0399ab405ebac09bbbeded38c/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L111 On Tue, Dec 13, 2022 at 1:44 AM Jan Lukavský wrote: Hi, I have a question about @RequiresStableInput functionality. I'm trying to make it work for portable Flink runner [1], [2]. We have an integration test (which should probably be turned into Validates runner test, but that is a different story) [3]. The test creates random key for input element, processes it once, fails the pipeline and then reprocesses it. This works well provided there is a checkpoint (shuffle in case of dataflow) exactly between assigning random key (via PairWithRandomKeyFn) and processing it with (via MakeSideEffectAndThenFailFn), this works well. The problem is that GreedyPipelineFuser fuses the transform PairWithRandomKeyFn andMakeSideEffectAndThenFailFn into single ExecutableStage. This is then executed with the @RequiresStableInput requirement, but this obviously assigns a different key to the reprocessed element(s). This looks like we need to fix that in the PipelineFuser, is this right? Does this mean the @RequiresStableInput functionality is actually broken for all runners that use the default fusion? Another possibility is that we need to fix test by adding an explicit reshuffle (verified, this works), but I think that the test is actually correct, users would probably not expect transforms to be fused when crossing the @RequiresStableInput boundary. Thoughts? Jan [1] https://github.com/apache/beam/issues/20812 [2] https://github.com/apache/beam/pull/22889 [3] https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java
Re: @RequiresStableInput and Pipeline fusion
This is definitely not working for portable pipelines since the GreedyPipelineFuser doesn't create a fusion boundary which as you pointed out causes a single stage that has a non-deterministic function followed by one that requires stable input. It seems as though we should have runners check the requirements on the Pipeline[1] to ensure that they can faithfully process such a pipeline and reject anything they don't support early on. Making the GreedyPipelineFuser insert that fusion break is likely the way to go. Runners should be able to look at the ParDoPayload requires_stable_input field for the ExecutableStage to see if any special handling is necessary on their end before they pass data to that stage. [1]: https://github.com/apache/beam/blob/77af3237521d94f0399ab405ebac09bbbeded38c/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L111 On Tue, Dec 13, 2022 at 1:44 AM Jan Lukavský wrote: > Hi, > > I have a question about @RequiresStableInput functionality. I'm trying to > make it work for portable Flink runner [1], [2]. We have an integration > test (which should probably be turned into Validates runner test, but that > is a different story) [3]. The test creates random key for input element, > processes it once, fails the pipeline and then reprocesses it. This works > well provided there is a checkpoint (shuffle in case of dataflow) exactly > between assigning random key (via PairWithRandomKeyFn) and processing it > with (via MakeSideEffectAndThenFailFn), this works well. > > The problem is that GreedyPipelineFuser fuses the transform > PairWithRandomKeyFn > and MakeSideEffectAndThenFailFn into single ExecutableStage. This is then > executed with the @RequiresStableInput requirement, but this obviously > assigns a different key to the reprocessed element(s). This looks like we > need to fix that in the PipelineFuser, is this right? Does this mean the > @RequiresStableInput functionality is actually broken for all runners that > use the default fusion? > > Another possibility is that we need to fix test by adding an explicit > reshuffle (verified, this works), but I think that the test is actually > correct, users would probably not expect transforms to be fused when > crossing the @RequiresStableInput boundary. > > Thoughts? > > Jan > > > [1] https://github.com/apache/beam/issues/20812 > [2] https://github.com/apache/beam/pull/22889 > [3] > https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java >
@RequiresStableInput and Pipeline fusion
Hi, I have a question about @RequiresStableInput functionality. I'm trying to make it work for portable Flink runner [1], [2]. We have an integration test (which should probably be turned into Validates runner test, but that is a different story) [3]. The test creates random key for input element, processes it once, fails the pipeline and then reprocesses it. This works well provided there is a checkpoint (shuffle in case of dataflow) exactly between assigning random key (via PairWithRandomKeyFn) and processing it with (via MakeSideEffectAndThenFailFn), this works well. The problem is that GreedyPipelineFuser fuses the transform PairWithRandomKeyFn andMakeSideEffectAndThenFailFn into single ExecutableStage. This is then executed with the @RequiresStableInput requirement, but this obviously assigns a different key to the reprocessed element(s). This looks like we need to fix that in the PipelineFuser, is this right? Does this mean the @RequiresStableInput functionality is actually broken for all runners that use the default fusion? Another possibility is that we need to fix test by adding an explicit reshuffle (verified, this works), but I think that the test is actually correct, users would probably not expect transforms to be fused when crossing the @RequiresStableInput boundary. Thoughts? Jan [1] https://github.com/apache/beam/issues/20812 [2] https://github.com/apache/beam/pull/22889 [3] https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java