Understanding kafka watermark strategy assigner

2020-11-08 Thread Nikola Hrusov
Hi,

I am reading about the watermark creation of the kafka streams using the
article here:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission

In there, it is a given example where the watermark assigner is directly
attached to the consumer like so (solution 1):

FlinkKafkaConsumer myConsumer = new FlinkKafkaConsumer<>("topic",
> new SimpleStringSchema(), properties);
>
> myConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)));
> env.addSource(myConsumer)


Then we can use that by adding it as a source and continue with the
application.

My question is, would that have any/much difference against doing it after
the source? Something like this (solution 2):


> FlinkKafkaConsumer myConsumer = new FlinkKafkaConsumer<>("topic",
> new SimpleStringSchema(), properties);
> env

.addSource(myConsumer)

.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)));
>

I can eventually think that it would create an extra operator, but is there
any other [unnecessary] overhead that solution 2 will give over solution 1?
I tried running a simple job, but I couldn't see much difference. I would
like to know if there is something I am unaware of and I can do better.

Regards
,
Nikola Hrusov


Re: Understanding kafka watermark strategy assigner

2020-11-09 Thread Kostas Kloudas
Hi Nikola,

Apart from the potential overhead you mentioned about having one more
operator, I cannot find any other. Also even this one I think is
negligible.
The reason why we recommend attaching the Watermark Generator to the
source is more about semantics rather than efficiency. It seems
natural for a pipeline whose logic depends on event time to have its
Watermarks generated at the source.

Cheers,
Kostas

On Sun, Nov 8, 2020 at 8:14 PM Nikola Hrusov  wrote:
>
> Hi,
>
> I am reading about the watermark creation of the kafka streams using the 
> article here: 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>
> In there, it is a given example where the watermark assigner is directly 
> attached to the consumer like so (solution 1):
>
>> FlinkKafkaConsumer myConsumer = new FlinkKafkaConsumer<>("topic", 
>> new SimpleStringSchema(), properties);
>> myConsumer.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)));
>> env.addSource(myConsumer)
>
>
> Then we can use that by adding it as a source and continue with the 
> application.
>
> My question is, would that have any/much difference against doing it after 
> the source? Something like this (solution 2):
>
>>
>> FlinkKafkaConsumer myConsumer = new FlinkKafkaConsumer<>("topic", 
>> new SimpleStringSchema(), properties);
>> env
>>
>> .addSource(myConsumer)
>>
>> .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)));
>
>
> I can eventually think that it would create an extra operator, but is there 
> any other [unnecessary] overhead that solution 2 will give over solution 1?
> I tried running a simple job, but I couldn't see much difference. I would 
> like to know if there is something I am unaware of and I can do better.
>
> Regards,
> Nikola Hrusov
>