I wanted to circle back here and state that using release 2.31.0, with _or_ without --experiments=use_deprecated_read, seems to resolve the slowness in my case. It's still on my radar to get a minimal pipeline extracted from my previously problematic pipeline so as to hopefully aid in debugging efforts.
Thanks, Evan On Mon, Jun 14, 2021 at 8:39 PM Robert Bradshaw <[email protected]> wrote: > Awesome, thanks! > > On Mon, Jun 14, 2021 at 5:36 PM Evan Galpin <[email protected]> wrote: > > > > I’ll try to create something as small as possible from the pipeline I > mentioned 👍 I should have time this week to do so. > > > > Thanks, > > Evan > > > > On Mon, Jun 14, 2021 at 18:09 Robert Bradshaw <[email protected]> > wrote: > >> > >> Is it possible to post the code? (Or the code of a similar, but > >> minimal, pipeline that exhibits the same issues?) > >> > >> On Mon, Jun 14, 2021 at 2:15 PM Evan Galpin <[email protected]> > wrote: > >> > > >> > @robert I have a pipeline which consistently shows a major slowdown > (10 seconds Vs 10 minutes) between version <=2.23.0 and >=2.25.0 that can > be boiled down to: > >> > > >> > - Read GCS file patterns from PubSub > >> > - Window into Fixed windows (repeating every 15 seconds) > >> > - Deduplicate/distinct (have tried both) > >> > - Read GCS blobs via patterns from the first step > >> > - Write file contents to sink > >> > > >> > It doesn't seem to matter if there are 0 messages in a subscription > or 50k messages at startup. The rate of new messages however is very low. > Not sure if those are helpful details, let me know if there's anything else > specific which would help. > >> > > >> > On Mon, Jun 14, 2021 at 12:44 PM Robert Bradshaw <[email protected]> > wrote: > >> >> > >> >> +1, we'd really like to get to the bottom of this, so clear > >> >> instructions on a pipeline/conditions that can reproduce it would be > >> >> great. > >> >> > >> >> On Mon, Jun 14, 2021 at 7:34 AM Jan Lukavský <[email protected]> > wrote: > >> >> > > >> >> > Hi Eddy, > >> >> > > >> >> > you are probably hitting a not-yet discovered bug in SDF > implementation in FlinkRunner that (under some currently unknown > conditions) seems to stop advancing the watermark. This has been observed > in one other instance (that I'm aware of). I think we don't yet have a > tracking JIRA for that, would you mind filling it? It would be awesome if > you could include estimations of messages per sec throughput that causes > the issue in your case. > >> >> > > >> >> > +Tobias Kaymak > >> >> > > >> >> > Tobias, could you please confirm that the case you had with Flink > stopping progressing watermark resembled this one? > >> >> > > >> >> > Thanks. > >> >> > > >> >> > Jan > >> >> > > >> >> > On 6/14/21 4:11 PM, Eddy G wrote: > >> >> > > >> >> > Hi Jan, > >> >> > > >> >> > I've added --experiments=use_deprecated_read and it seems to work > flawlessly (with my current Window and the one proposed by Evan). > >> >> > > >> >> > Why is this? Do Splittable DoFn now break current implementations? > Are there any posts of possible breaking changes? > >> >> > > >> >> > On 2021/06/14 13:19:39, Jan Lukavský <[email protected]> wrote: > >> >> > > >> >> > Hi Eddy, > >> >> > > >> >> > answers inline. > >> >> > > >> >> > On 6/14/21 3:05 PM, Eddy G wrote: > >> >> > > >> >> > Hi Jan, > >> >> > > >> >> > Thanks for replying so fast! > >> >> > > >> >> > Regarding your questions, > >> >> > > >> >> > - "Does your data get buffered in a state?" > >> >> > Yes, I do have a state within a stage prior ParquetIO writing > together with a Timer with PROCESSING_TIME. > >> >> > > >> >> > The stage which contains the state does send bytes to the next one > which is the ParquetIO writing. Seems the @OnTimer doesn't get triggered > and it's not clearing the state. This however does work under normal > circumstances without having too much data queued waiting to be processed. > >> >> > > >> >> > OK, this suggests, that the watermark is for some reason "stuck". > If you > >> >> > checkpoints enabled, you should see the size of the checkpoint to > grow > >> >> > over time. > >> >> > > >> >> > - "Do you see watermark being updated in your Flink WebUI?" > >> >> > The stages that do have a watermark don't get updated. The same > watermark value has been constant since the pipeline started. > >> >> > > >> >> > If no lateness is set, any late data should be admitted right? > >> >> > > >> >> > If no lateness is set, it means allowed lateness of Duration.ZERO, > which > >> >> > means that data that arrive after end-of-window will be dropped. > >> >> > > >> >> > Regarding 'droppedDueToLateness' metric, can't see it exposed > anywhere, neither in Flink UI or Prometheus. I've seen it in Dataflow but > seems to be a Dataflow specific metric right? > >> >> > > >> >> > Should not be Dataflow specific. But if you don't see it, it means > it > >> >> > could be zero. So, we can rule this out. > >> >> > > >> >> > We're using KinesisIO for reading messages. > >> >> > > >> >> > Kinesis uses UnboundedSource, which is expended to SDF starting > from > >> >> > Beam 2.25.0. The flag should change that as well. Can you try the > >> >> > --experiments=use_deprecated_read and see if you Pipeline DAG > changes > >> >> > (should not contain Impulse transform at the beginning) and if it > solves > >> >> > your issues? > >> >> > > >> >> > On 2021/06/14 12:48:58, Jan Lukavský <[email protected]> wrote: > >> >> > > >> >> > Hi Eddy, > >> >> > > >> >> > does your data get buffered in a state - e.g. does the size of the > state > >> >> > grow over time? Do you see watermark being updated in your Flink > WebUI? > >> >> > When a stateful operation (and GroupByKey is a stateful operation) > does > >> >> > not output any data, the first place to look at is if watermark > >> >> > correctly progresses. If it does not progress, then the input data > must > >> >> > be buffered in state and the size of the state should grow over > time. If > >> >> > it progresses, then it might be the case, that the data is too late > >> >> > after the watermark (the watermark estimator might need tuning) > and the > >> >> > data gets dropped (note you don't set any allowed lateness, which > >> >> > _might_ cause issues). You could see if your pipeline drops data in > >> >> > "droppedDueToLateness" metric. The size of you state would not > grow much > >> >> > in that situation. > >> >> > > >> >> > Another hint - If you use KafkaIO, try to disable SDF wrapper for > it > >> >> > using "--experiments=use_deprecated_read" on command line (which > you > >> >> > then must pass to PipelineOptionsFactory). There is some suspicion > that > >> >> > SDF wrapper for Kafka might not work as expected in certain > situations > >> >> > with Flink. > >> >> > > >> >> > Please feel free to share any results, > >> >> > > >> >> > Jan > >> >> > > >> >> > On 6/14/21 1:39 PM, Eddy G wrote: > >> >> > > >> >> > As seen in this image https://imgur.com/a/wrZET97, I'm trying to > deal with late data (intentionally stopped my consumer so data has been > accumulating for several days now). Now, with the following Window... I'm > using Beam 2.27 and Flink 1.12. > >> >> > > >> >> > > Window.into(FixedWindows.of(Duration.standardMinutes(10))) > >> >> > > >> >> > And several parsing stages after, once it's time to write within > the ParquetIO stage... > >> >> > > >> >> > FileIO > >> >> > .<String, MyClass>writeDynamic() > >> >> > .by(...) > >> >> > .via(...) > >> >> > .to(...) > >> >> > .withNaming(...) > >> >> > > .withDestinationCoder(StringUtf8Coder.of()) > >> >> > > .withNumShards(options.getNumShards()) > >> >> > > >> >> > it won't send bytes across all stages so no data is being written, > still it accumulates in the first stage seen in the image and won't go > further than that. > >> >> > > >> >> > Any reason why this may be happening? Wrong windowing strategy? >
