Correct, for batch we rely on re-running the entire job which will produce stable input within each run.

For streaming, the Flink Runner buffers all input to a @RequiresStableInput DoFn until a checkpoint is complete, only then it processes the buffered data. Dataflow effectively does the same by going through the Shuffle service which produces a consistent result.


On 08.07.20 11:08, Jozef Vilcek wrote:
My last question was more towards the graph translation for batch mode.

Should DoFn with @RequiresStableInput be translated/expanded in some specific way (e.g. DoFn -> Reshuffle + DoFn) or is it not needed for batch? Most runners fail in the presence of @RequiresStableInput for both batch and streaming. I can not find a fail for Flink and Dataflow, but at the same time, I can not find what those runners do with such DoFn.

On Tue, Jul 7, 2020 at 9:18 PM Kenneth Knowles < <>> wrote:

    I hope someone who knows better than me can respond.

    A long time ago, the SparkRunner added a call to materialize() at
    every GroupByKey. This was to mimic Dataflow, since so many of the
    initial IO transforms relied on using shuffle to create stable inputs.

    The overall goal is to be able to remove these extra calls to
    materialize() and only include them when @RequiresStableInput.

    The intermediate state is to analyze whether input is already stable
    from materialize() and add another materialize() only if it is not

    I don't know the current state of the SparkRunner. This may already
    have changed.


    On Thu, Jul 2, 2020 at 10:24 PM Jozef Vilcek <
    <>> wrote:

        I was trying to look for references on how other runners handle
        @RequiresStableInput for batch cases, however I was not able to
        find any.
        In Flink I can see added support for streaming case and in
        Dataflow I see that support for the feature was turned off

        It seems to me that @RequiresStableInput is ignored for the
        batch case and the runner relies on being able to recompute the
        whole job in the worst case scenario.
        Is this assumption correct?
        Could I just change SparkRunner to crash on @RequiresStableInput
        annotation for streaming mode and ignore it in batch?

        On Wed, Jul 1, 2020 at 10:27 AM Jozef Vilcek
        < <>> wrote:

            We have a component which we use in streaming and batch
            jobs. Streaming we run on FlinkRunner and batch on
            SparkRunner. Recently we needed to add @RequiresStableInput
            to taht component because of streaming use-case. But now
            batch case crash on SparkRunner with

            Caused by: java.lang.UnsupportedOperationException: Spark runner 
currently doesn't support @RequiresStableInput annotation.
                at org.apache.beam.sdk.Pipeline.replace(
                at org.apache.beam.sdk.Pipeline.replaceAll(
                at scala.util.Try$.apply(Try.scala:192)

            We are using Beam 2.19.0. Is the @RequiresStableInput
            problematic to support for both streaming and batch
            use-case? What are the options here?

Reply via email to