Hi Nikola, Apart from the potential overhead you mentioned about having one more operator, I cannot find any other. Also even this one I think is negligible. The reason why we recommend attaching the Watermark Generator to the source is more about semantics rather than efficiency. It seems natural for a pipeline whose logic depends on event time to have its Watermarks generated at the source.
Cheers, Kostas On Sun, Nov 8, 2020 at 8:14 PM Nikola Hrusov <n.hru...@gmail.com> wrote: > > Hi, > > I am reading about the watermark creation of the kafka streams using the > article here: > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission > > In there, it is a given example where the watermark assigner is directly > attached to the consumer like so (solution 1): > >> FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", >> new SimpleStringSchema(), properties); >> myConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))); >> env.addSource(myConsumer).... > > > Then we can use that by adding it as a source and continue with the > application. > > My question is, would that have any/much difference against doing it after > the source? Something like this (solution 2): > >> >> FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", >> new SimpleStringSchema(), properties); >> env >> >> .addSource(myConsumer) >> >> .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20))); > > > I can eventually think that it would create an extra operator, but is there > any other [unnecessary] overhead that solution 2 will give over solution 1? > I tried running a simple job, but I couldn't see much difference. I would > like to know if there is something I am unaware of and I can do better. > > Regards, > Nikola Hrusov >