My last question was more towards the graph translation for batch mode.

Should DoFn with @RequiresStableInput be translated/expanded in some
specific way (e.g. DoFn -> Reshuffle + DoFn) or is it not needed for batch?
Most runners fail in the presence of @RequiresStableInput for both batch
and streaming. I can not find a fail for Flink and Dataflow, but at the
same time, I can not find what those runners do with such DoFn.

On Tue, Jul 7, 2020 at 9:18 PM Kenneth Knowles <k...@apache.org> wrote:

> I hope someone who knows better than me can respond.
>
> A long time ago, the SparkRunner added a call to materialize() at every
> GroupByKey. This was to mimic Dataflow, since so many of the initial IO
> transforms relied on using shuffle to create stable inputs.
>
> The overall goal is to be able to remove these extra calls to
> materialize() and only include them when @RequiresStableInput.
>
> The intermediate state is to analyze whether input is already stable from
> materialize() and add another materialize() only if it is not stable.
>
> I don't know the current state of the SparkRunner. This may already have
> changed.
>
> Kenn
>
> On Thu, Jul 2, 2020 at 10:24 PM Jozef Vilcek <jozo.vil...@gmail.com>
> wrote:
>
>> I was trying to look for references on how other runners handle
>> @RequiresStableInput for batch cases, however I was not able to find any.
>> In Flink I can see added support for streaming case and in Dataflow I see
>> that support for the feature was turned off
>> https://github.com/apache/beam/pull/8065
>>
>> It seems to me that @RequiresStableInput is ignored for the batch case
>> and the runner relies on being able to recompute the whole job in the worst
>> case scenario.
>> Is this assumption correct?
>> Could I just change SparkRunner to crash on @RequiresStableInput
>> annotation for streaming mode and ignore it in batch?
>>
>>
>>
>> On Wed, Jul 1, 2020 at 10:27 AM Jozef Vilcek <jozo.vil...@gmail.com>
>> wrote:
>>
>>> We have a component which we use in streaming and batch jobs.
>>> Streaming we run on FlinkRunner and batch on SparkRunner. Recently we
>>> needed to add @RequiresStableInput to taht component because of streaming
>>> use-case. But now batch case crash on SparkRunner with
>>>
>>> Caused by: java.lang.UnsupportedOperationException: Spark runner currently 
>>> doesn't support @RequiresStableInput annotation.
>>>     at 
>>> org.apache.beam.runners.core.construction.UnsupportedOverrideFactory.getReplacementTransform(UnsupportedOverrideFactory.java:58)
>>>     at org.apache.beam.sdk.Pipeline.applyReplacement(Pipeline.java:556)
>>>     at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:292)
>>>     at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:210)
>>>     at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:168)
>>>     at org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:90)
>>>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
>>>     at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
>>>     at 
>>> com.sizmek.dp.dsp.pipeline.driver.PipelineDriver$$anonfun$1.apply(PipelineDriver.scala:42)
>>>     at 
>>> com.sizmek.dp.dsp.pipeline.driver.PipelineDriver$$anonfun$1.apply(PipelineDriver.scala:35)
>>>     at scala.util.Try$.apply(Try.scala:192)
>>>     at 
>>> com.dp.pipeline.driver.PipelineDriver$class.main(PipelineDriver.scala:35)
>>>
>>>
>>> We are using Beam 2.19.0. Is the @RequiresStableInput problematic to
>>> support for both streaming and batch use-case? What are the options here?
>>> https://issues.apache.org/jira/browse/BEAM-5358
>>>
>>>

Reply via email to