I’ll try to create something as small as possible from the pipeline I
mentioned 👍 I should have time this week to do so.

Thanks,
Evan

On Mon, Jun 14, 2021 at 18:09 Robert Bradshaw <[email protected]> wrote:

> Is it possible to post the code? (Or the code of a similar, but
> minimal, pipeline that exhibits the same issues?)
>
> On Mon, Jun 14, 2021 at 2:15 PM Evan Galpin <[email protected]> wrote:
> >
> > @robert I have a pipeline which consistently shows a major slowdown (10
> seconds Vs 10 minutes) between version <=2.23.0 and >=2.25.0 that can be
> boiled down to:
> >
> > - Read GCS file patterns from PubSub
> > - Window into Fixed windows (repeating every 15 seconds)
> > - Deduplicate/distinct (have tried both)
> > - Read GCS blobs via patterns from the first step
> > - Write file contents to sink
> >
> > It doesn't seem to matter if there are 0 messages in a subscription or
> 50k messages at startup. The rate of new messages however is very low. Not
> sure if those are helpful details, let me know if there's anything else
> specific which would help.
> >
> > On Mon, Jun 14, 2021 at 12:44 PM Robert Bradshaw <[email protected]>
> wrote:
> >>
> >> +1, we'd really like to get to the bottom of this, so clear
> >> instructions on a pipeline/conditions that can reproduce it would be
> >> great.
> >>
> >> On Mon, Jun 14, 2021 at 7:34 AM Jan Lukavský <[email protected]> wrote:
> >> >
> >> > Hi Eddy,
> >> >
> >> > you are probably hitting a not-yet discovered bug in SDF
> implementation in FlinkRunner that (under some currently unknown
> conditions) seems to stop advancing the watermark. This has been observed
> in one other instance (that I'm aware of). I think we don't yet have a
> tracking JIRA for that, would you mind filling it? It would be awesome if
> you could include estimations of messages per sec throughput that causes
> the issue in your case.
> >> >
> >> > +Tobias Kaymak
> >> >
> >> > Tobias, could you please confirm that the case you had with Flink
> stopping progressing watermark resembled this one?
> >> >
> >> > Thanks.
> >> >
> >> >  Jan
> >> >
> >> > On 6/14/21 4:11 PM, Eddy G wrote:
> >> >
> >> > Hi Jan,
> >> >
> >> > I've added --experiments=use_deprecated_read and it seems to work
> flawlessly (with my current Window and the one proposed by Evan).
> >> >
> >> > Why is this? Do Splittable DoFn now break current implementations?
> Are there any posts of possible breaking changes?
> >> >
> >> > On 2021/06/14 13:19:39, Jan Lukavský <[email protected]> wrote:
> >> >
> >> > Hi Eddy,
> >> >
> >> > answers inline.
> >> >
> >> > On 6/14/21 3:05 PM, Eddy G wrote:
> >> >
> >> > Hi Jan,
> >> >
> >> > Thanks for replying so fast!
> >> >
> >> > Regarding your questions,
> >> >
> >> > - "Does your data get buffered in a state?"
> >> > Yes, I do have a state within a stage prior ParquetIO writing
> together with a Timer with PROCESSING_TIME.
> >> >
> >> > The stage which contains the state does send bytes to the next one
> which is the ParquetIO writing. Seems the @OnTimer doesn't get triggered
> and it's not clearing the state. This however does work under normal
> circumstances without having too much data queued waiting to be processed.
> >> >
> >> > OK, this suggests, that the watermark is for some reason "stuck". If
> you
> >> > checkpoints enabled, you should see the size of the checkpoint to grow
> >> > over time.
> >> >
> >> > - "Do you see watermark being updated in your Flink WebUI?"
> >> > The stages that do have a watermark don't get updated. The same
> watermark value has been constant since the pipeline started.
> >> >
> >> > If no lateness is set, any late data should be admitted right?
> >> >
> >> > If no lateness is set, it means allowed lateness of Duration.ZERO,
> which
> >> > means that data that arrive after end-of-window will be dropped.
> >> >
> >> > Regarding 'droppedDueToLateness' metric, can't see it exposed
> anywhere, neither in Flink UI or Prometheus. I've seen it in Dataflow but
> seems to be a Dataflow specific metric right?
> >> >
> >> > Should not be Dataflow specific. But if you don't see it, it means it
> >> > could be zero. So, we can rule this out.
> >> >
> >> > We're using KinesisIO for reading messages.
> >> >
> >> > Kinesis uses UnboundedSource, which is expended to SDF starting from
> >> > Beam 2.25.0. The flag should change that as well. Can you try the
> >> > --experiments=use_deprecated_read and see if you Pipeline DAG changes
> >> > (should not contain Impulse transform at the beginning) and if it
> solves
> >> > your issues?
> >> >
> >> > On 2021/06/14 12:48:58, Jan Lukavský <[email protected]> wrote:
> >> >
> >> > Hi Eddy,
> >> >
> >> > does your data get buffered in a state - e.g. does the size of the
> state
> >> > grow over time? Do you see watermark being updated in your Flink
> WebUI?
> >> > When a stateful operation (and GroupByKey is a stateful operation)
> does
> >> > not output any data, the first place to look at is if watermark
> >> > correctly progresses. If it does not progress, then the input data
> must
> >> > be buffered in state and the size of the state should grow over time.
> If
> >> > it progresses, then it might be the case, that the data is too late
> >> > after the watermark (the watermark estimator might need tuning) and
> the
> >> > data gets dropped (note you don't set any allowed lateness, which
> >> > _might_ cause issues). You could see if your pipeline drops data in
> >> > "droppedDueToLateness" metric. The size of you state would not grow
> much
> >> > in that situation.
> >> >
> >> > Another hint - If you use KafkaIO, try to disable SDF wrapper for it
> >> > using "--experiments=use_deprecated_read" on command line (which you
> >> > then must pass to PipelineOptionsFactory). There is some suspicion
> that
> >> > SDF wrapper for Kafka might not work as expected in certain situations
> >> > with Flink.
> >> >
> >> > Please feel free to share any results,
> >> >
> >> >     Jan
> >> >
> >> > On 6/14/21 1:39 PM, Eddy G wrote:
> >> >
> >> > As seen in this image https://imgur.com/a/wrZET97, I'm trying to
> deal with late data (intentionally stopped my consumer so data has been
> accumulating for several days now). Now, with the following Window... I'm
> using Beam 2.27 and Flink 1.12.
> >> >
> >> >
>  Window.into(FixedWindows.of(Duration.standardMinutes(10)))
> >> >
> >> > And several parsing stages after, once it's time to write within the
> ParquetIO stage...
> >> >
> >> >                               FileIO
> >> >                                   .<String, MyClass>writeDynamic()
> >> >                                   .by(...)
> >> >                                   .via(...)
> >> >                                   .to(...)
> >> >                                   .withNaming(...)
> >> >
>  .withDestinationCoder(StringUtf8Coder.of())
> >> >
>  .withNumShards(options.getNumShards())
> >> >
> >> > it won't send bytes across all stages so no data is being written,
> still it accumulates in the first stage seen in the image and won't go
> further than that.
> >> >
> >> > Any reason why this may be happening? Wrong windowing strategy?
>

Reply via email to