Hi there, 

Right now I'm in the process of upgrading our Flink 1.9 jobs to Flink 1.11. 

In Flink 1.9, I was able to write a AssignerWithperiodicWatermarks which also 
extended AbstractRichFunction and could thus utilize State and 
getRuntimeContext() in there. This worked as the 
TimestampsAndWatermarksOperator was a AbstractUdfStreamOperator and passed my 
assigner in as the userFunction to that operator. 

I used this feature for some "per partition processing" which Flinks somehow 
isn't ideally suited for at the moment I guess. We have ascending watermarks 
per kafka partition and do some processing on that. In order to maintain state 
per kafka-partition, I now keyby kafkapartition in our stream (not ideal but 
better than operatorstate in terms of rescaling) but afterwards need to emulate 
the watermark strategy from the initial kafka source, i.e. reassign watermarks 
the same way as the kafka source did (per kafka partition within the operator). 
Via getRuntimeContext() I am/was able to identify the kafkaPartitions one 
operatorinstance was responsible for and could produce the outputwatermark 
accordingly. (min over all responsible partitions). 

In Flink 1.11, how can I rebuild this behavior? Do I really need to build my 
own TimestampsAndWatermarksOperator which works like the old one? Or is there a 
better approach? 

Best regards 
Theo 

Reply via email to