Hey Martijn

Thanks for the clarification. Now it makes sense.

I saw this feature FLIP-246 is still a WIP and there's no release date yet,
and it actually contains quite some changes in it. We noticed there's a
WIP PR for this change, just wondering if there's any plan in releasing
this feature?

For our current situation, we are subscribing to hundreds of topics, and we
add/remove topics quite often (every few days probably), adding topics
seems to be okay at the moment, but with the current KafkaSource design, if
removing a topic means we need to change the kafka soure id, and restart
with non-restored state, I assume it means we will lose the states of other
topics as well, and because we need to do this quite often, it seems quite
inconvenient to keep restarting the application with non-restored state.

We are thinking of introducing some temporary workaround while waiting for
this dynamic adding/removing topics feature (probably by forking the
flink-connector-kafka and add some custom logic there), just wondering if
there's any direction you can point us if we are to do the work around, or
is there any pre-existing work that we could potentially re-use?

On Thu, Nov 2, 2023 at 3:30 AM Martijn Visser <martijnvis...@apache.org>
wrote:

> Hi,
>
> That's by design: you can't dynamically add and remove topics from an
> existing Flink job that is being restarted from a snapshot. The
> feature you're looking for is being planned as part of FLIP-246 [1]
>
> Best regards,
>
> Martijn
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320
>
>
> On Wed, Nov 1, 2023 at 7:29 AM Emily Li via user <user@flink.apache.org>
> wrote:
> >
> > Hey
> >
> > We have a flinkapp which is subscribing to multiple topics, we recently
> upgraded our application from 1.13 to 1.15, which we started to use
> KafkaSource instead of FlinkKafkaConsumer (deprecated).
> >
> > But we noticed some weird issue with KafkaSource after the upgrade, we
> are setting the topics with the kafkaSource builder like this
> >
> > ```
> >
> > KafkaSource
> >
> >   .builder[CustomEvent]
> >
> >   .setBootstrapServers(p.bootstrapServers)
> >
> >   .setGroupId(consumerGroupName)
> >
> >   .setDeserializer(deserializer)
> >
> >   .setTopics(topics)
> > ```
> >
> > And we pass in a list of topics to subscribe, but from time to time we
> will add some new topics or remove some topics (stop consuming them), but
> we noticed that ever since we upgraded to 1.15, when we remove a topic from
> the list, it somehow still consuming the topic (committed offset to the
> already unsubscribed topics, we also have some logs and metrics showing
> that we are still consuming the already removed topic), and from the
> aws.kafka.sum_offset_lag metric, we can also see the removed topic having
> negative lag...
> >
> >
> > And if we delete the topic in kafka, the running flink application will
> crash and throw an error "
> >
> > saying the partition cannot be found (because the topic is already
> deleted from Kafka).
> >
> >
> > We'd like to understand what could have caused this and if this is a bug
> in KafkaSource?
> >
> >
> > When we were in 1.13, this never occurred, we were able to remove topics
> without any issues.
> >
> >
> > We also tried to upgrade to flink 1.17, but the same issue occurred.
>

Reply via email to