Is it possible to post the code? (Or the code of a similar, but minimal, pipeline that exhibits the same issues?)
On Mon, Jun 14, 2021 at 2:15 PM Evan Galpin <evan.gal...@gmail.com> wrote: > > @robert I have a pipeline which consistently shows a major slowdown (10 > seconds Vs 10 minutes) between version <=2.23.0 and >=2.25.0 that can be > boiled down to: > > - Read GCS file patterns from PubSub > - Window into Fixed windows (repeating every 15 seconds) > - Deduplicate/distinct (have tried both) > - Read GCS blobs via patterns from the first step > - Write file contents to sink > > It doesn't seem to matter if there are 0 messages in a subscription or 50k > messages at startup. The rate of new messages however is very low. Not sure > if those are helpful details, let me know if there's anything else specific > which would help. > > On Mon, Jun 14, 2021 at 12:44 PM Robert Bradshaw <rober...@google.com> wrote: >> >> +1, we'd really like to get to the bottom of this, so clear >> instructions on a pipeline/conditions that can reproduce it would be >> great. >> >> On Mon, Jun 14, 2021 at 7:34 AM Jan Lukavský <je...@seznam.cz> wrote: >> > >> > Hi Eddy, >> > >> > you are probably hitting a not-yet discovered bug in SDF implementation in >> > FlinkRunner that (under some currently unknown conditions) seems to stop >> > advancing the watermark. This has been observed in one other instance >> > (that I'm aware of). I think we don't yet have a tracking JIRA for that, >> > would you mind filling it? It would be awesome if you could include >> > estimations of messages per sec throughput that causes the issue in your >> > case. >> > >> > +Tobias Kaymak >> > >> > Tobias, could you please confirm that the case you had with Flink stopping >> > progressing watermark resembled this one? >> > >> > Thanks. >> > >> > Jan >> > >> > On 6/14/21 4:11 PM, Eddy G wrote: >> > >> > Hi Jan, >> > >> > I've added --experiments=use_deprecated_read and it seems to work >> > flawlessly (with my current Window and the one proposed by Evan). >> > >> > Why is this? Do Splittable DoFn now break current implementations? Are >> > there any posts of possible breaking changes? >> > >> > On 2021/06/14 13:19:39, Jan Lukavský <je...@seznam.cz> wrote: >> > >> > Hi Eddy, >> > >> > answers inline. >> > >> > On 6/14/21 3:05 PM, Eddy G wrote: >> > >> > Hi Jan, >> > >> > Thanks for replying so fast! >> > >> > Regarding your questions, >> > >> > - "Does your data get buffered in a state?" >> > Yes, I do have a state within a stage prior ParquetIO writing together >> > with a Timer with PROCESSING_TIME. >> > >> > The stage which contains the state does send bytes to the next one which >> > is the ParquetIO writing. Seems the @OnTimer doesn't get triggered and >> > it's not clearing the state. This however does work under normal >> > circumstances without having too much data queued waiting to be processed. >> > >> > OK, this suggests, that the watermark is for some reason "stuck". If you >> > checkpoints enabled, you should see the size of the checkpoint to grow >> > over time. >> > >> > - "Do you see watermark being updated in your Flink WebUI?" >> > The stages that do have a watermark don't get updated. The same watermark >> > value has been constant since the pipeline started. >> > >> > If no lateness is set, any late data should be admitted right? >> > >> > If no lateness is set, it means allowed lateness of Duration.ZERO, which >> > means that data that arrive after end-of-window will be dropped. >> > >> > Regarding 'droppedDueToLateness' metric, can't see it exposed anywhere, >> > neither in Flink UI or Prometheus. I've seen it in Dataflow but seems to >> > be a Dataflow specific metric right? >> > >> > Should not be Dataflow specific. But if you don't see it, it means it >> > could be zero. So, we can rule this out. >> > >> > We're using KinesisIO for reading messages. >> > >> > Kinesis uses UnboundedSource, which is expended to SDF starting from >> > Beam 2.25.0. The flag should change that as well. Can you try the >> > --experiments=use_deprecated_read and see if you Pipeline DAG changes >> > (should not contain Impulse transform at the beginning) and if it solves >> > your issues? >> > >> > On 2021/06/14 12:48:58, Jan Lukavský <je...@seznam.cz> wrote: >> > >> > Hi Eddy, >> > >> > does your data get buffered in a state - e.g. does the size of the state >> > grow over time? Do you see watermark being updated in your Flink WebUI? >> > When a stateful operation (and GroupByKey is a stateful operation) does >> > not output any data, the first place to look at is if watermark >> > correctly progresses. If it does not progress, then the input data must >> > be buffered in state and the size of the state should grow over time. If >> > it progresses, then it might be the case, that the data is too late >> > after the watermark (the watermark estimator might need tuning) and the >> > data gets dropped (note you don't set any allowed lateness, which >> > _might_ cause issues). You could see if your pipeline drops data in >> > "droppedDueToLateness" metric. The size of you state would not grow much >> > in that situation. >> > >> > Another hint - If you use KafkaIO, try to disable SDF wrapper for it >> > using "--experiments=use_deprecated_read" on command line (which you >> > then must pass to PipelineOptionsFactory). There is some suspicion that >> > SDF wrapper for Kafka might not work as expected in certain situations >> > with Flink. >> > >> > Please feel free to share any results, >> > >> > Jan >> > >> > On 6/14/21 1:39 PM, Eddy G wrote: >> > >> > As seen in this image https://imgur.com/a/wrZET97, I'm trying to deal with >> > late data (intentionally stopped my consumer so data has been accumulating >> > for several days now). Now, with the following Window... I'm using Beam >> > 2.27 and Flink 1.12. >> > >> > >> > Window.into(FixedWindows.of(Duration.standardMinutes(10))) >> > >> > And several parsing stages after, once it's time to write within the >> > ParquetIO stage... >> > >> > FileIO >> > .<String, MyClass>writeDynamic() >> > .by(...) >> > .via(...) >> > .to(...) >> > .withNaming(...) >> > >> > .withDestinationCoder(StringUtf8Coder.of()) >> > .withNumShards(options.getNumShards()) >> > >> > it won't send bytes across all stages so no data is being written, still >> > it accumulates in the first stage seen in the image and won't go further >> > than that. >> > >> > Any reason why this may be happening? Wrong windowing strategy?