There have been varied reports of slowness loosely attributed to SDF default wrapper change from 2.25.0. Ex
https://lists.apache.org/thread.html/re6b0941a8b4951293a0327ce9b25e607cafd6e45b69783f65290edee%40%3Cdev.beam.apache.org%3E https://issues.apache.org/jira/plugins/servlet/mobile#issue/BEAM-10670/comment/17316858 http://mail-archives.apache.org/mod_mbox/beam-dev/202105.mbox/%3ccae7uba_v0vfl9ck7n06n2zf6e+xcebdircez7yftlfwuvch...@mail.gmail.com%3e On Mon, Jun 14, 2021 at 10:11 Eddy G <[email protected]> wrote: > Hi Jan, > > I've added --experiments=use_deprecated_read and it seems to work > flawlessly (with my current Window and the one proposed by Evan). > > Why is this? Do Splittable DoFn now break current implementations? Are > there any posts of possible breaking changes? > > On 2021/06/14 13:19:39, Jan Lukavský <[email protected]> wrote: > > Hi Eddy, > > > > answers inline. > > > > On 6/14/21 3:05 PM, Eddy G wrote: > > > Hi Jan, > > > > > > Thanks for replying so fast! > > > > > > Regarding your questions, > > > > > > - "Does your data get buffered in a state?" > > > Yes, I do have a state within a stage prior ParquetIO writing together > with a Timer with PROCESSING_TIME. > > > > > > The stage which contains the state does send bytes to the next one > which is the ParquetIO writing. Seems the @OnTimer doesn't get triggered > and it's not clearing the state. This however does work under normal > circumstances without having too much data queued waiting to be processed. > > OK, this suggests, that the watermark is for some reason "stuck". If you > > checkpoints enabled, you should see the size of the checkpoint to grow > > over time. > > > > > > - "Do you see watermark being updated in your Flink WebUI?" > > > The stages that do have a watermark don't get updated. The same > watermark value has been constant since the pipeline started. > > > > > > If no lateness is set, any late data should be admitted right? > > If no lateness is set, it means allowed lateness of Duration.ZERO, which > > means that data that arrive after end-of-window will be dropped. > > > > > > Regarding 'droppedDueToLateness' metric, can't see it exposed > anywhere, neither in Flink UI or Prometheus. I've seen it in Dataflow but > seems to be a Dataflow specific metric right? > > Should not be Dataflow specific. But if you don't see it, it means it > > could be zero. So, we can rule this out. > > > > > > We're using KinesisIO for reading messages. > > Kinesis uses UnboundedSource, which is expended to SDF starting from > > Beam 2.25.0. The flag should change that as well. Can you try the > > --experiments=use_deprecated_read and see if you Pipeline DAG changes > > (should not contain Impulse transform at the beginning) and if it solves > > your issues? > > > > > > On 2021/06/14 12:48:58, Jan Lukavský <[email protected]> wrote: > > >> Hi Eddy, > > >> > > >> does your data get buffered in a state - e.g. does the size of the > state > > >> grow over time? Do you see watermark being updated in your Flink > WebUI? > > >> When a stateful operation (and GroupByKey is a stateful operation) > does > > >> not output any data, the first place to look at is if watermark > > >> correctly progresses. If it does not progress, then the input data > must > > >> be buffered in state and the size of the state should grow over time. > If > > >> it progresses, then it might be the case, that the data is too late > > >> after the watermark (the watermark estimator might need tuning) and > the > > >> data gets dropped (note you don't set any allowed lateness, which > > >> _might_ cause issues). You could see if your pipeline drops data in > > >> "droppedDueToLateness" metric. The size of you state would not grow > much > > >> in that situation. > > >> > > >> Another hint - If you use KafkaIO, try to disable SDF wrapper for it > > >> using "--experiments=use_deprecated_read" on command line (which you > > >> then must pass to PipelineOptionsFactory). There is some suspicion > that > > >> SDF wrapper for Kafka might not work as expected in certain situations > > >> with Flink. > > >> > > >> Please feel free to share any results, > > >> > > >> Jan > > >> > > >> On 6/14/21 1:39 PM, Eddy G wrote: > > >>> As seen in this image https://imgur.com/a/wrZET97, I'm trying to > deal with late data (intentionally stopped my consumer so data has been > accumulating for several days now). Now, with the following Window... I'm > using Beam 2.27 and Flink 1.12. > > >>> > > >>> > Window.into(FixedWindows.of(Duration.standardMinutes(10))) > > >>> > > >>> And several parsing stages after, once it's time to write within the > ParquetIO stage... > > >>> > > >>> FileIO > > >>> .<String, MyClass>writeDynamic() > > >>> .by(...) > > >>> .via(...) > > >>> .to(...) > > >>> .withNaming(...) > > >>> > .withDestinationCoder(StringUtf8Coder.of()) > > >>> > .withNumShards(options.getNumShards()) > > >>> > > >>> it won't send bytes across all stages so no data is being written, > still it accumulates in the first stage seen in the image and won't go > further than that. > > >>> > > >>> Any reason why this may be happening? Wrong windowing strategy? > > >
