+1, we'd really like to get to the bottom of this, so clear
instructions on a pipeline/conditions that can reproduce it would be
great.

On Mon, Jun 14, 2021 at 7:34 AM Jan Lukavský <je...@seznam.cz> wrote:
>
> Hi Eddy,
>
> you are probably hitting a not-yet discovered bug in SDF implementation in 
> FlinkRunner that (under some currently unknown conditions) seems to stop 
> advancing the watermark. This has been observed in one other instance (that 
> I'm aware of). I think we don't yet have a tracking JIRA for that, would you 
> mind filling it? It would be awesome if you could include estimations of 
> messages per sec throughput that causes the issue in your case.
>
> +Tobias Kaymak
>
> Tobias, could you please confirm that the case you had with Flink stopping 
> progressing watermark resembled this one?
>
> Thanks.
>
>  Jan
>
> On 6/14/21 4:11 PM, Eddy G wrote:
>
> Hi Jan,
>
> I've added --experiments=use_deprecated_read and it seems to work flawlessly 
> (with my current Window and the one proposed by Evan).
>
> Why is this? Do Splittable DoFn now break current implementations? Are there 
> any posts of possible breaking changes?
>
> On 2021/06/14 13:19:39, Jan Lukavský <je...@seznam.cz> wrote:
>
> Hi Eddy,
>
> answers inline.
>
> On 6/14/21 3:05 PM, Eddy G wrote:
>
> Hi Jan,
>
> Thanks for replying so fast!
>
> Regarding your questions,
>
> - "Does your data get buffered in a state?"
> Yes, I do have a state within a stage prior ParquetIO writing together with a 
> Timer with PROCESSING_TIME.
>
> The stage which contains the state does send bytes to the next one which is 
> the ParquetIO writing. Seems the @OnTimer doesn't get triggered and it's not 
> clearing the state. This however does work under normal circumstances without 
> having too much data queued waiting to be processed.
>
> OK, this suggests, that the watermark is for some reason "stuck". If you
> checkpoints enabled, you should see the size of the checkpoint to grow
> over time.
>
> - "Do you see watermark being updated in your Flink WebUI?"
> The stages that do have a watermark don't get updated. The same watermark 
> value has been constant since the pipeline started.
>
> If no lateness is set, any late data should be admitted right?
>
> If no lateness is set, it means allowed lateness of Duration.ZERO, which
> means that data that arrive after end-of-window will be dropped.
>
> Regarding 'droppedDueToLateness' metric, can't see it exposed anywhere, 
> neither in Flink UI or Prometheus. I've seen it in Dataflow but seems to be a 
> Dataflow specific metric right?
>
> Should not be Dataflow specific. But if you don't see it, it means it
> could be zero. So, we can rule this out.
>
> We're using KinesisIO for reading messages.
>
> Kinesis uses UnboundedSource, which is expended to SDF starting from
> Beam 2.25.0. The flag should change that as well. Can you try the
> --experiments=use_deprecated_read and see if you Pipeline DAG changes
> (should not contain Impulse transform at the beginning) and if it solves
> your issues?
>
> On 2021/06/14 12:48:58, Jan Lukavský <je...@seznam.cz> wrote:
>
> Hi Eddy,
>
> does your data get buffered in a state - e.g. does the size of the state
> grow over time? Do you see watermark being updated in your Flink WebUI?
> When a stateful operation (and GroupByKey is a stateful operation) does
> not output any data, the first place to look at is if watermark
> correctly progresses. If it does not progress, then the input data must
> be buffered in state and the size of the state should grow over time. If
> it progresses, then it might be the case, that the data is too late
> after the watermark (the watermark estimator might need tuning) and the
> data gets dropped (note you don't set any allowed lateness, which
> _might_ cause issues). You could see if your pipeline drops data in
> "droppedDueToLateness" metric. The size of you state would not grow much
> in that situation.
>
> Another hint - If you use KafkaIO, try to disable SDF wrapper for it
> using "--experiments=use_deprecated_read" on command line (which you
> then must pass to PipelineOptionsFactory). There is some suspicion that
> SDF wrapper for Kafka might not work as expected in certain situations
> with Flink.
>
> Please feel free to share any results,
>
>     Jan
>
> On 6/14/21 1:39 PM, Eddy G wrote:
>
> As seen in this image https://imgur.com/a/wrZET97, I'm trying to deal with 
> late data (intentionally stopped my consumer so data has been accumulating 
> for several days now). Now, with the following Window... I'm using Beam 2.27 
> and Flink 1.12.
>
>                               
> Window.into(FixedWindows.of(Duration.standardMinutes(10)))
>
> And several parsing stages after, once it's time to write within the 
> ParquetIO stage...
>
>                               FileIO
>                                   .<String, MyClass>writeDynamic()
>                                   .by(...)
>                                   .via(...)
>                                   .to(...)
>                                   .withNaming(...)
>                                   .withDestinationCoder(StringUtf8Coder.of())
>                                   .withNumShards(options.getNumShards())
>
> it won't send bytes across all stages so no data is being written, still it 
> accumulates in the first stage seen in the image and won't go further than 
> that.
>
> Any reason why this may be happening? Wrong windowing strategy?

Reply via email to