Hi,

That's by design: you can't dynamically add and remove topics from an
existing Flink job that is being restarted from a snapshot. The
feature you're looking for is being planned as part of FLIP-246 [1]

Best regards,

Martijn

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320


On Wed, Nov 1, 2023 at 7:29 AM Emily Li via user <user@flink.apache.org> wrote:
>
> Hey
>
> We have a flinkapp which is subscribing to multiple topics, we recently 
> upgraded our application from 1.13 to 1.15, which we started to use 
> KafkaSource instead of FlinkKafkaConsumer (deprecated).
>
> But we noticed some weird issue with KafkaSource after the upgrade, we are 
> setting the topics with the kafkaSource builder like this
>
> ```
>
> KafkaSource
>
>   .builder[CustomEvent]
>
>   .setBootstrapServers(p.bootstrapServers)
>
>   .setGroupId(consumerGroupName)
>
>   .setDeserializer(deserializer)
>
>   .setTopics(topics)
> ```
>
> And we pass in a list of topics to subscribe, but from time to time we will 
> add some new topics or remove some topics (stop consuming them), but we 
> noticed that ever since we upgraded to 1.15, when we remove a topic from the 
> list, it somehow still consuming the topic (committed offset to the already 
> unsubscribed topics, we also have some logs and metrics showing that we are 
> still consuming the already removed topic), and from the 
> aws.kafka.sum_offset_lag metric, we can also see the removed topic having 
> negative lag...
>
>
> And if we delete the topic in kafka, the running flink application will crash 
> and throw an error "
>
> saying the partition cannot be found (because the topic is already deleted 
> from Kafka).
>
>
> We'd like to understand what could have caused this and if this is a bug in 
> KafkaSource?
>
>
> When we were in 1.13, this never occurred, we were able to remove topics 
> without any issues.
>
>
> We also tried to upgrade to flink 1.17, but the same issue occurred.

Reply via email to