+1, we'd really like to get to the bottom of this, so clear instructions on a pipeline/conditions that can reproduce it would be great.
On Mon, Jun 14, 2021 at 7:34 AM Jan Lukavský <je...@seznam.cz> wrote: > > Hi Eddy, > > you are probably hitting a not-yet discovered bug in SDF implementation in > FlinkRunner that (under some currently unknown conditions) seems to stop > advancing the watermark. This has been observed in one other instance (that > I'm aware of). I think we don't yet have a tracking JIRA for that, would you > mind filling it? It would be awesome if you could include estimations of > messages per sec throughput that causes the issue in your case. > > +Tobias Kaymak > > Tobias, could you please confirm that the case you had with Flink stopping > progressing watermark resembled this one? > > Thanks. > > Jan > > On 6/14/21 4:11 PM, Eddy G wrote: > > Hi Jan, > > I've added --experiments=use_deprecated_read and it seems to work flawlessly > (with my current Window and the one proposed by Evan). > > Why is this? Do Splittable DoFn now break current implementations? Are there > any posts of possible breaking changes? > > On 2021/06/14 13:19:39, Jan Lukavský <je...@seznam.cz> wrote: > > Hi Eddy, > > answers inline. > > On 6/14/21 3:05 PM, Eddy G wrote: > > Hi Jan, > > Thanks for replying so fast! > > Regarding your questions, > > - "Does your data get buffered in a state?" > Yes, I do have a state within a stage prior ParquetIO writing together with a > Timer with PROCESSING_TIME. > > The stage which contains the state does send bytes to the next one which is > the ParquetIO writing. Seems the @OnTimer doesn't get triggered and it's not > clearing the state. This however does work under normal circumstances without > having too much data queued waiting to be processed. > > OK, this suggests, that the watermark is for some reason "stuck". If you > checkpoints enabled, you should see the size of the checkpoint to grow > over time. > > - "Do you see watermark being updated in your Flink WebUI?" > The stages that do have a watermark don't get updated. The same watermark > value has been constant since the pipeline started. > > If no lateness is set, any late data should be admitted right? > > If no lateness is set, it means allowed lateness of Duration.ZERO, which > means that data that arrive after end-of-window will be dropped. > > Regarding 'droppedDueToLateness' metric, can't see it exposed anywhere, > neither in Flink UI or Prometheus. I've seen it in Dataflow but seems to be a > Dataflow specific metric right? > > Should not be Dataflow specific. But if you don't see it, it means it > could be zero. So, we can rule this out. > > We're using KinesisIO for reading messages. > > Kinesis uses UnboundedSource, which is expended to SDF starting from > Beam 2.25.0. The flag should change that as well. Can you try the > --experiments=use_deprecated_read and see if you Pipeline DAG changes > (should not contain Impulse transform at the beginning) and if it solves > your issues? > > On 2021/06/14 12:48:58, Jan Lukavský <je...@seznam.cz> wrote: > > Hi Eddy, > > does your data get buffered in a state - e.g. does the size of the state > grow over time? Do you see watermark being updated in your Flink WebUI? > When a stateful operation (and GroupByKey is a stateful operation) does > not output any data, the first place to look at is if watermark > correctly progresses. If it does not progress, then the input data must > be buffered in state and the size of the state should grow over time. If > it progresses, then it might be the case, that the data is too late > after the watermark (the watermark estimator might need tuning) and the > data gets dropped (note you don't set any allowed lateness, which > _might_ cause issues). You could see if your pipeline drops data in > "droppedDueToLateness" metric. The size of you state would not grow much > in that situation. > > Another hint - If you use KafkaIO, try to disable SDF wrapper for it > using "--experiments=use_deprecated_read" on command line (which you > then must pass to PipelineOptionsFactory). There is some suspicion that > SDF wrapper for Kafka might not work as expected in certain situations > with Flink. > > Please feel free to share any results, > > Jan > > On 6/14/21 1:39 PM, Eddy G wrote: > > As seen in this image https://imgur.com/a/wrZET97, I'm trying to deal with > late data (intentionally stopped my consumer so data has been accumulating > for several days now). Now, with the following Window... I'm using Beam 2.27 > and Flink 1.12. > > > Window.into(FixedWindows.of(Duration.standardMinutes(10))) > > And several parsing stages after, once it's time to write within the > ParquetIO stage... > > FileIO > .<String, MyClass>writeDynamic() > .by(...) > .via(...) > .to(...) > .withNaming(...) > .withDestinationCoder(StringUtf8Coder.of()) > .withNumShards(options.getNumShards()) > > it won't send bytes across all stages so no data is being written, still it > accumulates in the first stage seen in the image and won't go further than > that. > > Any reason why this may be happening? Wrong windowing strategy?