There have been varied reports of slowness loosely attributed to SDF
default wrapper change from 2.25.0.  Ex

https://lists.apache.org/thread.html/re6b0941a8b4951293a0327ce9b25e607cafd6e45b69783f65290edee%40%3Cdev.beam.apache.org%3E

https://issues.apache.org/jira/plugins/servlet/mobile#issue/BEAM-10670/comment/17316858

http://mail-archives.apache.org/mod_mbox/beam-dev/202105.mbox/%3ccae7uba_v0vfl9ck7n06n2zf6e+xcebdircez7yftlfwuvch...@mail.gmail.com%3e



On Mon, Jun 14, 2021 at 10:11 Eddy G <[email protected]> wrote:

> Hi Jan,
>
> I've added --experiments=use_deprecated_read and it seems to work
> flawlessly (with my current Window and the one proposed by Evan).
>
> Why is this? Do Splittable DoFn now break current implementations? Are
> there any posts of possible breaking changes?
>
> On 2021/06/14 13:19:39, Jan Lukavský <[email protected]> wrote:
> > Hi Eddy,
> >
> > answers inline.
> >
> > On 6/14/21 3:05 PM, Eddy G wrote:
> > > Hi Jan,
> > >
> > > Thanks for replying so fast!
> > >
> > > Regarding your questions,
> > >
> > > - "Does your data get buffered in a state?"
> > > Yes, I do have a state within a stage prior ParquetIO writing together
> with a Timer with PROCESSING_TIME.
> > >
> > > The stage which contains the state does send bytes to the next one
> which is the ParquetIO writing. Seems the @OnTimer doesn't get triggered
> and it's not clearing the state. This however does work under normal
> circumstances without having too much data queued waiting to be processed.
> > OK, this suggests, that the watermark is for some reason "stuck". If you
> > checkpoints enabled, you should see the size of the checkpoint to grow
> > over time.
> > >
> > > - "Do you see watermark being updated in your Flink WebUI?"
> > > The stages that do have a watermark don't get updated. The same
> watermark value has been constant since the pipeline started.
> > >
> > > If no lateness is set, any late data should be admitted right?
> > If no lateness is set, it means allowed lateness of Duration.ZERO, which
> > means that data that arrive after end-of-window will be dropped.
> > >
> > > Regarding 'droppedDueToLateness' metric, can't see it exposed
> anywhere, neither in Flink UI or Prometheus. I've seen it in Dataflow but
> seems to be a Dataflow specific metric right?
> > Should not be Dataflow specific. But if you don't see it, it means it
> > could be zero. So, we can rule this out.
> > >
> > > We're using KinesisIO for reading messages.
> > Kinesis uses UnboundedSource, which is expended to SDF starting from
> > Beam 2.25.0. The flag should change that as well. Can you try the
> > --experiments=use_deprecated_read and see if you Pipeline DAG changes
> > (should not contain Impulse transform at the beginning) and if it solves
> > your issues?
> > >
> > > On 2021/06/14 12:48:58, Jan Lukavský <[email protected]> wrote:
> > >> Hi Eddy,
> > >>
> > >> does your data get buffered in a state - e.g. does the size of the
> state
> > >> grow over time? Do you see watermark being updated in your Flink
> WebUI?
> > >> When a stateful operation (and GroupByKey is a stateful operation)
> does
> > >> not output any data, the first place to look at is if watermark
> > >> correctly progresses. If it does not progress, then the input data
> must
> > >> be buffered in state and the size of the state should grow over time.
> If
> > >> it progresses, then it might be the case, that the data is too late
> > >> after the watermark (the watermark estimator might need tuning) and
> the
> > >> data gets dropped (note you don't set any allowed lateness, which
> > >> _might_ cause issues). You could see if your pipeline drops data in
> > >> "droppedDueToLateness" metric. The size of you state would not grow
> much
> > >> in that situation.
> > >>
> > >> Another hint - If you use KafkaIO, try to disable SDF wrapper for it
> > >> using "--experiments=use_deprecated_read" on command line (which you
> > >> then must pass to PipelineOptionsFactory). There is some suspicion
> that
> > >> SDF wrapper for Kafka might not work as expected in certain situations
> > >> with Flink.
> > >>
> > >> Please feel free to share any results,
> > >>
> > >>     Jan
> > >>
> > >> On 6/14/21 1:39 PM, Eddy G wrote:
> > >>> As seen in this image https://imgur.com/a/wrZET97, I'm trying to
> deal with late data (intentionally stopped my consumer so data has been
> accumulating for several days now). Now, with the following Window... I'm
> using Beam 2.27 and Flink 1.12.
> > >>>
> > >>>
>  Window.into(FixedWindows.of(Duration.standardMinutes(10)))
> > >>>
> > >>> And several parsing stages after, once it's time to write within the
> ParquetIO stage...
> > >>>
> > >>>                               FileIO
> > >>>                                   .<String, MyClass>writeDynamic()
> > >>>                                   .by(...)
> > >>>                                   .via(...)
> > >>>                                   .to(...)
> > >>>                                   .withNaming(...)
> > >>>
>  .withDestinationCoder(StringUtf8Coder.of())
> > >>>
>  .withNumShards(options.getNumShards())
> > >>>
> > >>> it won't send bytes across all stages so no data is being written,
> still it accumulates in the first stage seen in the image and won't go
> further than that.
> > >>>
> > >>> Any reason why this may be happening? Wrong windowing strategy?
> >
>

Reply via email to