Hey We have a flinkapp which is subscribing to multiple topics, we recently upgraded our application from 1.13 to 1.15, which we started to use KafkaSource instead of FlinkKafkaConsumer (deprecated).
But we noticed some weird issue with KafkaSource after the upgrade, we are setting the topics with the kafkaSource builder like this ``` KafkaSource .builder[CustomEvent] .setBootstrapServers(p.bootstrapServers) .setGroupId(consumerGroupName) .setDeserializer(deserializer) .setTopics(topics) ``` And we pass in a list of topics to subscribe, but from time to time we will add some new topics or remove some topics (stop consuming them), but we noticed that ever since we upgraded to 1.15, when we remove a topic from the list, it somehow still consuming the topic (committed offset to the already unsubscribed topics, we also have some logs and metrics showing that we are still consuming the already removed topic), and from the aws.kafka.sum_offset_lag metric, we can also see the removed topic having negative lag... And if we delete the topic in kafka, the running flink application will crash and throw an error " saying the partition cannot be found (because the topic is already deleted from Kafka). We'd like to understand what could have caused this and if this is a bug in KafkaSource? When we were in 1.13, this never occurred, we were able to remove topics without any issues. We also tried to upgrade to flink 1.17, but the same issue occurred.