Hi there, In my usecase, I read data from Kafka where in each kafka partition, I have ascending timestamps. Currently, I parse the data from Kafka with a custom deserialization schema so that after parsing, the FlinkKafkaConsumerBase can extract the eventtime ascending timestamps and create proper watermarks via the WatermarkMultiplexer (i.e. take the minimum watermark over all non-idling assigned partitions in that task).
Now, we have a strange modification that the parser can change at runtime and only via the new parser, I can extract the timestamp field of the received byte[]. The parser change is told to me via another kafka topic. I immediately thought about: That's a perfect usecase for broadcast streams: I connect the parser config stream and buffer the events in the connect function if the old parser is not able to parse the events up until a new parser arrives. (Doesn't sound too good from architecture all in all, but that's how it is). My problem is the following: If I want to use broadcast stream, I must outsource my parser to a new pipeline step and don't parse within the KafkaDeserializationSchema any longer. This also means that Flink/Kafka can't produce the watermarks and I need to emulate the nice per partition ascending watermark assigner with the downstream multiplexer myself. Am I correct? Can I "easily" plugin to my stream (after broadcast parsing) this timestamp assigner with multiplexer logic? Could it also detect idle partitions like the KafkaConsumer? Or which way would you go? The only alternative I see is to greatly incrase the complexity of my KafkaDeserializationSchema to also read another kafka topic in background and as well buffer elements internally.. Sounds not very "flinkish". Best regards Theo