Correct, for batch we rely on re-running the entire job which will produce stable input within each run.

For streaming, the Flink Runner buffers all input to a @RequiresStableInput DoFn until a checkpoint is complete, only then it processes the buffered data. Dataflow effectively does the same by going through the Shuffle service which produces a consistent result.

-Max

On 08.07.20 11:08, Jozef Vilcek wrote:
My last question was more towards the graph translation for batch mode.

Should DoFn with @RequiresStableInput be translated/expanded in some specific way (e.g. DoFn -> Reshuffle + DoFn) or is it not needed for batch? Most runners fail in the presence of @RequiresStableInput for both batch and streaming. I can not find a fail for Flink and Dataflow, but at the same time, I can not find what those runners do with such DoFn.

On Tue, Jul 7, 2020 at 9:18 PM Kenneth Knowles <k...@apache.org <mailto:k...@apache.org>> wrote:

    I hope someone who knows better than me can respond.

    A long time ago, the SparkRunner added a call to materialize() at
    every GroupByKey. This was to mimic Dataflow, since so many of the
    initial IO transforms relied on using shuffle to create stable inputs.

    The overall goal is to be able to remove these extra calls to
    materialize() and only include them when @RequiresStableInput.

    The intermediate state is to analyze whether input is already stable
    from materialize() and add another materialize() only if it is not
    stable.

    I don't know the current state of the SparkRunner. This may already
    have changed.

    Kenn

    On Thu, Jul 2, 2020 at 10:24 PM Jozef Vilcek <jozo.vil...@gmail.com
    <mailto:jozo.vil...@gmail.com>> wrote:

        I was trying to look for references on how other runners handle
        @RequiresStableInput for batch cases, however I was not able to
        find any.
        In Flink I can see added support for streaming case and in
        Dataflow I see that support for the feature was turned off
        https://github.com/apache/beam/pull/8065

        It seems to me that @RequiresStableInput is ignored for the
        batch case and the runner relies on being able to recompute the
        whole job in the worst case scenario.
        Is this assumption correct?
        Could I just change SparkRunner to crash on @RequiresStableInput
        annotation for streaming mode and ignore it in batch?



        On Wed, Jul 1, 2020 at 10:27 AM Jozef Vilcek
        <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote:

            We have a component which we use in streaming and batch
            jobs. Streaming we run on FlinkRunner and batch on
            SparkRunner. Recently we needed to add @RequiresStableInput
            to taht component because of streaming use-case. But now
            batch case crash on SparkRunner with

            Caused by: java.lang.UnsupportedOperationException: Spark runner 
currently doesn't support @RequiresStableInput annotation.
                at 
org.apache.beam.runners.core.construction.UnsupportedOverrideFactory.getReplacementTransform(UnsupportedOverrideFactory.java:58)
                at 
org.apache.beam.sdk.Pipeline.applyReplacement(Pipeline.java:556)
                at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:292)
                at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:210)
                at 
org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:168)
                at 
org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:90)
                at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
                at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
                at 
com.sizmek.dp.dsp.pipeline.driver.PipelineDriver$$anonfun$1.apply(PipelineDriver.scala:42)
                at 
com.sizmek.dp.dsp.pipeline.driver.PipelineDriver$$anonfun$1.apply(PipelineDriver.scala:35)
                at scala.util.Try$.apply(Try.scala:192)
                at 
com.dp.pipeline.driver.PipelineDriver$class.main(PipelineDriver.scala:35)


            We are using Beam 2.19.0. Is the @RequiresStableInput
            problematic to support for both streaming and batch
            use-case? What are the options here?
            https://issues.apache.org/jira/browse/BEAM-5358

Reply via email to