Hi, That's by design: you can't dynamically add and remove topics from an existing Flink job that is being restarted from a snapshot. The feature you're looking for is being planned as part of FLIP-246 [1]
Best regards, Martijn [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320 On Wed, Nov 1, 2023 at 7:29 AM Emily Li via user <user@flink.apache.org> wrote: > > Hey > > We have a flinkapp which is subscribing to multiple topics, we recently > upgraded our application from 1.13 to 1.15, which we started to use > KafkaSource instead of FlinkKafkaConsumer (deprecated). > > But we noticed some weird issue with KafkaSource after the upgrade, we are > setting the topics with the kafkaSource builder like this > > ``` > > KafkaSource > > .builder[CustomEvent] > > .setBootstrapServers(p.bootstrapServers) > > .setGroupId(consumerGroupName) > > .setDeserializer(deserializer) > > .setTopics(topics) > ``` > > And we pass in a list of topics to subscribe, but from time to time we will > add some new topics or remove some topics (stop consuming them), but we > noticed that ever since we upgraded to 1.15, when we remove a topic from the > list, it somehow still consuming the topic (committed offset to the already > unsubscribed topics, we also have some logs and metrics showing that we are > still consuming the already removed topic), and from the > aws.kafka.sum_offset_lag metric, we can also see the removed topic having > negative lag... > > > And if we delete the topic in kafka, the running flink application will crash > and throw an error " > > saying the partition cannot be found (because the topic is already deleted > from Kafka). > > > We'd like to understand what could have caused this and if this is a bug in > KafkaSource? > > > When we were in 1.13, this never occurred, we were able to remove topics > without any issues. > > > We also tried to upgrade to flink 1.17, but the same issue occurred.