Hi Emily

One workaround that might help is to leverage the state-processor-api[1].
You would have to do some upfront work to create a state-processor job to
wipe the state (offsets) of the topic you want to remove and use the newly
generated savepoint without the removed state of the topic or topics. It
could even be parameterized to be more generic and thus be reusable across
multiple jobs.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/#state-processor-api

Hope that helps
-Hector


On Thu, Nov 2, 2023 at 7:25 AM Emily Li via user <user@flink.apache.org>
wrote:

> Hey Martijn
>
> Thanks for the clarification. Now it makes sense.
>
> I saw this feature FLIP-246 is still a WIP and there's no release date
> yet, and it actually contains quite some changes in it. We noticed there's
> a WIP PR for this change, just wondering if there's any plan in releasing
> this feature?
>
> For our current situation, we are subscribing to hundreds of topics, and
> we add/remove topics quite often (every few days probably), adding topics
> seems to be okay at the moment, but with the current KafkaSource design, if
> removing a topic means we need to change the kafka soure id, and restart
> with non-restored state, I assume it means we will lose the states of other
> topics as well, and because we need to do this quite often, it seems quite
> inconvenient to keep restarting the application with non-restored state.
>
> We are thinking of introducing some temporary workaround while waiting for
> this dynamic adding/removing topics feature (probably by forking the
> flink-connector-kafka and add some custom logic there), just wondering if
> there's any direction you can point us if we are to do the work around, or
> is there any pre-existing work that we could potentially re-use?
>
> On Thu, Nov 2, 2023 at 3:30 AM Martijn Visser <martijnvis...@apache.org>
> wrote:
>
>> Hi,
>>
>> That's by design: you can't dynamically add and remove topics from an
>> existing Flink job that is being restarted from a snapshot. The
>> feature you're looking for is being planned as part of FLIP-246 [1]
>>
>> Best regards,
>>
>> Martijn
>>
>> [1]
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320
>>
>>
>> On Wed, Nov 1, 2023 at 7:29 AM Emily Li via user <user@flink.apache.org>
>> wrote:
>> >
>> > Hey
>> >
>> > We have a flinkapp which is subscribing to multiple topics, we recently
>> upgraded our application from 1.13 to 1.15, which we started to use
>> KafkaSource instead of FlinkKafkaConsumer (deprecated).
>> >
>> > But we noticed some weird issue with KafkaSource after the upgrade, we
>> are setting the topics with the kafkaSource builder like this
>> >
>> > ```
>> >
>> > KafkaSource
>> >
>> >   .builder[CustomEvent]
>> >
>> >   .setBootstrapServers(p.bootstrapServers)
>> >
>> >   .setGroupId(consumerGroupName)
>> >
>> >   .setDeserializer(deserializer)
>> >
>> >   .setTopics(topics)
>> > ```
>> >
>> > And we pass in a list of topics to subscribe, but from time to time we
>> will add some new topics or remove some topics (stop consuming them), but
>> we noticed that ever since we upgraded to 1.15, when we remove a topic from
>> the list, it somehow still consuming the topic (committed offset to the
>> already unsubscribed topics, we also have some logs and metrics showing
>> that we are still consuming the already removed topic), and from the
>> aws.kafka.sum_offset_lag metric, we can also see the removed topic having
>> negative lag...
>> >
>> >
>> > And if we delete the topic in kafka, the running flink application will
>> crash and throw an error "
>> >
>> > saying the partition cannot be found (because the topic is already
>> deleted from Kafka).
>> >
>> >
>> > We'd like to understand what could have caused this and if this is a
>> bug in KafkaSource?
>> >
>> >
>> > When we were in 1.13, this never occurred, we were able to remove
>> topics without any issues.
>> >
>> >
>> > We also tried to upgrade to flink 1.17, but the same issue occurred.
>>
>

Reply via email to