@robert I have a pipeline which consistently shows a major slowdown (10
seconds Vs 10 minutes) between version <=2.23.0 and >=2.25.0 that can be
boiled down to:

- Read GCS file patterns from PubSub
- Window into Fixed windows (repeating every 15 seconds)
- Deduplicate/distinct (have tried both)
- Read GCS blobs via patterns from the first step
- Write file contents to sink

It doesn't seem to matter if there are 0 messages in a subscription or 50k
messages at startup. The rate of new messages however is very low. Not sure
if those are helpful details, let me know if there's anything else specific
which would help.

On Mon, Jun 14, 2021 at 12:44 PM Robert Bradshaw <[email protected]>
wrote:

> +1, we'd really like to get to the bottom of this, so clear
> instructions on a pipeline/conditions that can reproduce it would be
> great.
>
> On Mon, Jun 14, 2021 at 7:34 AM Jan Lukavský <[email protected]> wrote:
> >
> > Hi Eddy,
> >
> > you are probably hitting a not-yet discovered bug in SDF implementation
> in FlinkRunner that (under some currently unknown conditions) seems to stop
> advancing the watermark. This has been observed in one other instance (that
> I'm aware of). I think we don't yet have a tracking JIRA for that, would
> you mind filling it? It would be awesome if you could include estimations
> of messages per sec throughput that causes the issue in your case.
> >
> > +Tobias Kaymak
> >
> > Tobias, could you please confirm that the case you had with Flink
> stopping progressing watermark resembled this one?
> >
> > Thanks.
> >
> >  Jan
> >
> > On 6/14/21 4:11 PM, Eddy G wrote:
> >
> > Hi Jan,
> >
> > I've added --experiments=use_deprecated_read and it seems to work
> flawlessly (with my current Window and the one proposed by Evan).
> >
> > Why is this? Do Splittable DoFn now break current implementations? Are
> there any posts of possible breaking changes?
> >
> > On 2021/06/14 13:19:39, Jan Lukavský <[email protected]> wrote:
> >
> > Hi Eddy,
> >
> > answers inline.
> >
> > On 6/14/21 3:05 PM, Eddy G wrote:
> >
> > Hi Jan,
> >
> > Thanks for replying so fast!
> >
> > Regarding your questions,
> >
> > - "Does your data get buffered in a state?"
> > Yes, I do have a state within a stage prior ParquetIO writing together
> with a Timer with PROCESSING_TIME.
> >
> > The stage which contains the state does send bytes to the next one which
> is the ParquetIO writing. Seems the @OnTimer doesn't get triggered and it's
> not clearing the state. This however does work under normal circumstances
> without having too much data queued waiting to be processed.
> >
> > OK, this suggests, that the watermark is for some reason "stuck". If you
> > checkpoints enabled, you should see the size of the checkpoint to grow
> > over time.
> >
> > - "Do you see watermark being updated in your Flink WebUI?"
> > The stages that do have a watermark don't get updated. The same
> watermark value has been constant since the pipeline started.
> >
> > If no lateness is set, any late data should be admitted right?
> >
> > If no lateness is set, it means allowed lateness of Duration.ZERO, which
> > means that data that arrive after end-of-window will be dropped.
> >
> > Regarding 'droppedDueToLateness' metric, can't see it exposed anywhere,
> neither in Flink UI or Prometheus. I've seen it in Dataflow but seems to be
> a Dataflow specific metric right?
> >
> > Should not be Dataflow specific. But if you don't see it, it means it
> > could be zero. So, we can rule this out.
> >
> > We're using KinesisIO for reading messages.
> >
> > Kinesis uses UnboundedSource, which is expended to SDF starting from
> > Beam 2.25.0. The flag should change that as well. Can you try the
> > --experiments=use_deprecated_read and see if you Pipeline DAG changes
> > (should not contain Impulse transform at the beginning) and if it solves
> > your issues?
> >
> > On 2021/06/14 12:48:58, Jan Lukavský <[email protected]> wrote:
> >
> > Hi Eddy,
> >
> > does your data get buffered in a state - e.g. does the size of the state
> > grow over time? Do you see watermark being updated in your Flink WebUI?
> > When a stateful operation (and GroupByKey is a stateful operation) does
> > not output any data, the first place to look at is if watermark
> > correctly progresses. If it does not progress, then the input data must
> > be buffered in state and the size of the state should grow over time. If
> > it progresses, then it might be the case, that the data is too late
> > after the watermark (the watermark estimator might need tuning) and the
> > data gets dropped (note you don't set any allowed lateness, which
> > _might_ cause issues). You could see if your pipeline drops data in
> > "droppedDueToLateness" metric. The size of you state would not grow much
> > in that situation.
> >
> > Another hint - If you use KafkaIO, try to disable SDF wrapper for it
> > using "--experiments=use_deprecated_read" on command line (which you
> > then must pass to PipelineOptionsFactory). There is some suspicion that
> > SDF wrapper for Kafka might not work as expected in certain situations
> > with Flink.
> >
> > Please feel free to share any results,
> >
> >     Jan
> >
> > On 6/14/21 1:39 PM, Eddy G wrote:
> >
> > As seen in this image https://imgur.com/a/wrZET97, I'm trying to deal
> with late data (intentionally stopped my consumer so data has been
> accumulating for several days now). Now, with the following Window... I'm
> using Beam 2.27 and Flink 1.12.
> >
> >
>  Window.into(FixedWindows.of(Duration.standardMinutes(10)))
> >
> > And several parsing stages after, once it's time to write within the
> ParquetIO stage...
> >
> >                               FileIO
> >                                   .<String, MyClass>writeDynamic()
> >                                   .by(...)
> >                                   .via(...)
> >                                   .to(...)
> >                                   .withNaming(...)
> >
>  .withDestinationCoder(StringUtf8Coder.of())
> >                                   .withNumShards(options.getNumShards())
> >
> > it won't send bytes across all stages so no data is being written, still
> it accumulates in the first stage seen in the image and won't go further
> than that.
> >
> > Any reason why this may be happening? Wrong windowing strategy?
>

Reply via email to