Correct, for batch we rely on re-running the entire job which will
produce stable input within each run.
For streaming, the Flink Runner buffers all input to a
@RequiresStableInput DoFn until a checkpoint is complete, only then it
processes the buffered data. Dataflow effectively does the same by going
through the Shuffle service which produces a consistent result.
-Max
On 08.07.20 11:08, Jozef Vilcek wrote:
My last question was more towards the graph translation for batch mode.
Should DoFn with @RequiresStableInput be translated/expanded in some
specific way (e.g. DoFn -> Reshuffle + DoFn) or is it not needed for batch?
Most runners fail in the presence of @RequiresStableInput for both batch
and streaming. I can not find a fail for Flink and Dataflow, but at the
same time, I can not find what those runners do with such DoFn.
On Tue, Jul 7, 2020 at 9:18 PM Kenneth Knowles <k...@apache.org
<mailto:k...@apache.org>> wrote:
I hope someone who knows better than me can respond.
A long time ago, the SparkRunner added a call to materialize() at
every GroupByKey. This was to mimic Dataflow, since so many of the
initial IO transforms relied on using shuffle to create stable inputs.
The overall goal is to be able to remove these extra calls to
materialize() and only include them when @RequiresStableInput.
The intermediate state is to analyze whether input is already stable
from materialize() and add another materialize() only if it is not
stable.
I don't know the current state of the SparkRunner. This may already
have changed.
Kenn
On Thu, Jul 2, 2020 at 10:24 PM Jozef Vilcek <jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com>> wrote:
I was trying to look for references on how other runners handle
@RequiresStableInput for batch cases, however I was not able to
find any.
In Flink I can see added support for streaming case and in
Dataflow I see that support for the feature was turned off
https://github.com/apache/beam/pull/8065
It seems to me that @RequiresStableInput is ignored for the
batch case and the runner relies on being able to recompute the
whole job in the worst case scenario.
Is this assumption correct?
Could I just change SparkRunner to crash on @RequiresStableInput
annotation for streaming mode and ignore it in batch?
On Wed, Jul 1, 2020 at 10:27 AM Jozef Vilcek
<jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote:
We have a component which we use in streaming and batch
jobs. Streaming we run on FlinkRunner and batch on
SparkRunner. Recently we needed to add @RequiresStableInput
to taht component because of streaming use-case. But now
batch case crash on SparkRunner with
Caused by: java.lang.UnsupportedOperationException: Spark runner
currently doesn't support @RequiresStableInput annotation.
at
org.apache.beam.runners.core.construction.UnsupportedOverrideFactory.getReplacementTransform(UnsupportedOverrideFactory.java:58)
at
org.apache.beam.sdk.Pipeline.applyReplacement(Pipeline.java:556)
at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:292)
at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:210)
at
org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:168)
at
org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:90)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
at
com.sizmek.dp.dsp.pipeline.driver.PipelineDriver$$anonfun$1.apply(PipelineDriver.scala:42)
at
com.sizmek.dp.dsp.pipeline.driver.PipelineDriver$$anonfun$1.apply(PipelineDriver.scala:35)
at scala.util.Try$.apply(Try.scala:192)
at
com.dp.pipeline.driver.PipelineDriver$class.main(PipelineDriver.scala:35)
We are using Beam 2.19.0. Is the @RequiresStableInput
problematic to support for both streaming and batch
use-case? What are the options here?
https://issues.apache.org/jira/browse/BEAM-5358