Is it possible to post the code? (Or the code of a similar, but
minimal, pipeline that exhibits the same issues?)

On Mon, Jun 14, 2021 at 2:15 PM Evan Galpin <> wrote:
> @robert I have a pipeline which consistently shows a major slowdown (10 
> seconds Vs 10 minutes) between version <=2.23.0 and >=2.25.0 that can be 
> boiled down to:
> - Read GCS file patterns from PubSub
> - Window into Fixed windows (repeating every 15 seconds)
> - Deduplicate/distinct (have tried both)
> - Read GCS blobs via patterns from the first step
> - Write file contents to sink
> It doesn't seem to matter if there are 0 messages in a subscription or 50k 
> messages at startup. The rate of new messages however is very low. Not sure 
> if those are helpful details, let me know if there's anything else specific 
> which would help.
> On Mon, Jun 14, 2021 at 12:44 PM Robert Bradshaw <> wrote:
>> +1, we'd really like to get to the bottom of this, so clear
>> instructions on a pipeline/conditions that can reproduce it would be
>> great.
>> On Mon, Jun 14, 2021 at 7:34 AM Jan Lukavský <> wrote:
>> >
>> > Hi Eddy,
>> >
>> > you are probably hitting a not-yet discovered bug in SDF implementation in 
>> > FlinkRunner that (under some currently unknown conditions) seems to stop 
>> > advancing the watermark. This has been observed in one other instance 
>> > (that I'm aware of). I think we don't yet have a tracking JIRA for that, 
>> > would you mind filling it? It would be awesome if you could include 
>> > estimations of messages per sec throughput that causes the issue in your 
>> > case.
>> >
>> > +Tobias Kaymak
>> >
>> > Tobias, could you please confirm that the case you had with Flink stopping 
>> > progressing watermark resembled this one?
>> >
>> > Thanks.
>> >
>> >  Jan
>> >
>> > On 6/14/21 4:11 PM, Eddy G wrote:
>> >
>> > Hi Jan,
>> >
>> > I've added --experiments=use_deprecated_read and it seems to work 
>> > flawlessly (with my current Window and the one proposed by Evan).
>> >
>> > Why is this? Do Splittable DoFn now break current implementations? Are 
>> > there any posts of possible breaking changes?
>> >
>> > On 2021/06/14 13:19:39, Jan Lukavský <> wrote:
>> >
>> > Hi Eddy,
>> >
>> > answers inline.
>> >
>> > On 6/14/21 3:05 PM, Eddy G wrote:
>> >
>> > Hi Jan,
>> >
>> > Thanks for replying so fast!
>> >
>> > Regarding your questions,
>> >
>> > - "Does your data get buffered in a state?"
>> > Yes, I do have a state within a stage prior ParquetIO writing together 
>> > with a Timer with PROCESSING_TIME.
>> >
>> > The stage which contains the state does send bytes to the next one which 
>> > is the ParquetIO writing. Seems the @OnTimer doesn't get triggered and 
>> > it's not clearing the state. This however does work under normal 
>> > circumstances without having too much data queued waiting to be processed.
>> >
>> > OK, this suggests, that the watermark is for some reason "stuck". If you
>> > checkpoints enabled, you should see the size of the checkpoint to grow
>> > over time.
>> >
>> > - "Do you see watermark being updated in your Flink WebUI?"
>> > The stages that do have a watermark don't get updated. The same watermark 
>> > value has been constant since the pipeline started.
>> >
>> > If no lateness is set, any late data should be admitted right?
>> >
>> > If no lateness is set, it means allowed lateness of Duration.ZERO, which
>> > means that data that arrive after end-of-window will be dropped.
>> >
>> > Regarding 'droppedDueToLateness' metric, can't see it exposed anywhere, 
>> > neither in Flink UI or Prometheus. I've seen it in Dataflow but seems to 
>> > be a Dataflow specific metric right?
>> >
>> > Should not be Dataflow specific. But if you don't see it, it means it
>> > could be zero. So, we can rule this out.
>> >
>> > We're using KinesisIO for reading messages.
>> >
>> > Kinesis uses UnboundedSource, which is expended to SDF starting from
>> > Beam 2.25.0. The flag should change that as well. Can you try the
>> > --experiments=use_deprecated_read and see if you Pipeline DAG changes
>> > (should not contain Impulse transform at the beginning) and if it solves
>> > your issues?
>> >
>> > On 2021/06/14 12:48:58, Jan Lukavský <> wrote:
>> >
>> > Hi Eddy,
>> >
>> > does your data get buffered in a state - e.g. does the size of the state
>> > grow over time? Do you see watermark being updated in your Flink WebUI?
>> > When a stateful operation (and GroupByKey is a stateful operation) does
>> > not output any data, the first place to look at is if watermark
>> > correctly progresses. If it does not progress, then the input data must
>> > be buffered in state and the size of the state should grow over time. If
>> > it progresses, then it might be the case, that the data is too late
>> > after the watermark (the watermark estimator might need tuning) and the
>> > data gets dropped (note you don't set any allowed lateness, which
>> > _might_ cause issues). You could see if your pipeline drops data in
>> > "droppedDueToLateness" metric. The size of you state would not grow much
>> > in that situation.
>> >
>> > Another hint - If you use KafkaIO, try to disable SDF wrapper for it
>> > using "--experiments=use_deprecated_read" on command line (which you
>> > then must pass to PipelineOptionsFactory). There is some suspicion that
>> > SDF wrapper for Kafka might not work as expected in certain situations
>> > with Flink.
>> >
>> > Please feel free to share any results,
>> >
>> >     Jan
>> >
>> > On 6/14/21 1:39 PM, Eddy G wrote:
>> >
>> > As seen in this image, I'm trying to deal with 
>> > late data (intentionally stopped my consumer so data has been accumulating 
>> > for several days now). Now, with the following Window... I'm using Beam 
>> > 2.27 and Flink 1.12.
>> >
>> >                               
>> > Window.into(FixedWindows.of(Duration.standardMinutes(10)))
>> >
>> > And several parsing stages after, once it's time to write within the 
>> > ParquetIO stage...
>> >
>> >                               FileIO
>> >                                   .<String, MyClass>writeDynamic()
>> >                                   .by(...)
>> >                                   .via(...)
>> >                                   .to(...)
>> >                                   .withNaming(...)
>> >                                   
>> > .withDestinationCoder(StringUtf8Coder.of())
>> >                                   .withNumShards(options.getNumShards())
>> >
>> > it won't send bytes across all stages so no data is being written, still 
>> > it accumulates in the first stage seen in the image and won't go further 
>> > than that.
>> >
>> > Any reason why this may be happening? Wrong windowing strategy?

Reply via email to