My last question was more towards the graph translation for batch mode.

Should DoFn with @RequiresStableInput be translated/expanded in some
specific way (e.g. DoFn -> Reshuffle + DoFn) or is it not needed for batch?
Most runners fail in the presence of @RequiresStableInput for both batch
and streaming. I can not find a fail for Flink and Dataflow, but at the
same time, I can not find what those runners do with such DoFn.

On Tue, Jul 7, 2020 at 9:18 PM Kenneth Knowles <> wrote:

> I hope someone who knows better than me can respond.
> A long time ago, the SparkRunner added a call to materialize() at every
> GroupByKey. This was to mimic Dataflow, since so many of the initial IO
> transforms relied on using shuffle to create stable inputs.
> The overall goal is to be able to remove these extra calls to
> materialize() and only include them when @RequiresStableInput.
> The intermediate state is to analyze whether input is already stable from
> materialize() and add another materialize() only if it is not stable.
> I don't know the current state of the SparkRunner. This may already have
> changed.
> Kenn
> On Thu, Jul 2, 2020 at 10:24 PM Jozef Vilcek <>
> wrote:
>> I was trying to look for references on how other runners handle
>> @RequiresStableInput for batch cases, however I was not able to find any.
>> In Flink I can see added support for streaming case and in Dataflow I see
>> that support for the feature was turned off
>> It seems to me that @RequiresStableInput is ignored for the batch case
>> and the runner relies on being able to recompute the whole job in the worst
>> case scenario.
>> Is this assumption correct?
>> Could I just change SparkRunner to crash on @RequiresStableInput
>> annotation for streaming mode and ignore it in batch?
>> On Wed, Jul 1, 2020 at 10:27 AM Jozef Vilcek <>
>> wrote:
>>> We have a component which we use in streaming and batch jobs.
>>> Streaming we run on FlinkRunner and batch on SparkRunner. Recently we
>>> needed to add @RequiresStableInput to taht component because of streaming
>>> use-case. But now batch case crash on SparkRunner with
>>> Caused by: java.lang.UnsupportedOperationException: Spark runner currently 
>>> doesn't support @RequiresStableInput annotation.
>>>     at 
>>>     at org.apache.beam.sdk.Pipeline.applyReplacement(
>>>     at org.apache.beam.sdk.Pipeline.replace(
>>>     at org.apache.beam.sdk.Pipeline.replaceAll(
>>>     at
>>>     at
>>>     at
>>>     at
>>>     at 
>>> com.sizmek.dp.dsp.pipeline.driver.PipelineDriver$$anonfun$1.apply(PipelineDriver.scala:42)
>>>     at 
>>> com.sizmek.dp.dsp.pipeline.driver.PipelineDriver$$anonfun$1.apply(PipelineDriver.scala:35)
>>>     at scala.util.Try$.apply(Try.scala:192)
>>>     at 
>>> com.dp.pipeline.driver.PipelineDriver$class.main(PipelineDriver.scala:35)
>>> We are using Beam 2.19.0. Is the @RequiresStableInput problematic to
>>> support for both streaming and batch use-case? What are the options here?

Reply via email to