Re: Issues about removed topics with KafkaSource

2023-11-02 Thread Hector Rios
Hi Emily

One workaround that might help is to leverage the state-processor-api[1].
You would have to do some upfront work to create a state-processor job to
wipe the state (offsets) of the topic you want to remove and use the newly
generated savepoint without the removed state of the topic or topics. It
could even be parameterized to be more generic and thus be reusable across
multiple jobs.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/libs/state_processor_api/#state-processor-api

Hope that helps
-Hector


On Thu, Nov 2, 2023 at 7:25 AM Emily Li via user 
wrote:

> Hey Martijn
>
> Thanks for the clarification. Now it makes sense.
>
> I saw this feature FLIP-246 is still a WIP and there's no release date
> yet, and it actually contains quite some changes in it. We noticed there's
> a WIP PR for this change, just wondering if there's any plan in releasing
> this feature?
>
> For our current situation, we are subscribing to hundreds of topics, and
> we add/remove topics quite often (every few days probably), adding topics
> seems to be okay at the moment, but with the current KafkaSource design, if
> removing a topic means we need to change the kafka soure id, and restart
> with non-restored state, I assume it means we will lose the states of other
> topics as well, and because we need to do this quite often, it seems quite
> inconvenient to keep restarting the application with non-restored state.
>
> We are thinking of introducing some temporary workaround while waiting for
> this dynamic adding/removing topics feature (probably by forking the
> flink-connector-kafka and add some custom logic there), just wondering if
> there's any direction you can point us if we are to do the work around, or
> is there any pre-existing work that we could potentially re-use?
>
> On Thu, Nov 2, 2023 at 3:30 AM Martijn Visser 
> wrote:
>
>> Hi,
>>
>> That's by design: you can't dynamically add and remove topics from an
>> existing Flink job that is being restarted from a snapshot. The
>> feature you're looking for is being planned as part of FLIP-246 [1]
>>
>> Best regards,
>>
>> Martijn
>>
>> [1]
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320
>>
>>
>> On Wed, Nov 1, 2023 at 7:29 AM Emily Li via user 
>> wrote:
>> >
>> > Hey
>> >
>> > We have a flinkapp which is subscribing to multiple topics, we recently
>> upgraded our application from 1.13 to 1.15, which we started to use
>> KafkaSource instead of FlinkKafkaConsumer (deprecated).
>> >
>> > But we noticed some weird issue with KafkaSource after the upgrade, we
>> are setting the topics with the kafkaSource builder like this
>> >
>> > ```
>> >
>> > KafkaSource
>> >
>> >   .builder[CustomEvent]
>> >
>> >   .setBootstrapServers(p.bootstrapServers)
>> >
>> >   .setGroupId(consumerGroupName)
>> >
>> >   .setDeserializer(deserializer)
>> >
>> >   .setTopics(topics)
>> > ```
>> >
>> > And we pass in a list of topics to subscribe, but from time to time we
>> will add some new topics or remove some topics (stop consuming them), but
>> we noticed that ever since we upgraded to 1.15, when we remove a topic from
>> the list, it somehow still consuming the topic (committed offset to the
>> already unsubscribed topics, we also have some logs and metrics showing
>> that we are still consuming the already removed topic), and from the
>> aws.kafka.sum_offset_lag metric, we can also see the removed topic having
>> negative lag...
>> >
>> >
>> > And if we delete the topic in kafka, the running flink application will
>> crash and throw an error "
>> >
>> > saying the partition cannot be found (because the topic is already
>> deleted from Kafka).
>> >
>> >
>> > We'd like to understand what could have caused this and if this is a
>> bug in KafkaSource?
>> >
>> >
>> > When we were in 1.13, this never occurred, we were able to remove
>> topics without any issues.
>> >
>> >
>> > We also tried to upgrade to flink 1.17, but the same issue occurred.
>>
>


Re: Issues about removed topics with KafkaSource

2023-11-02 Thread Emily Li via user
Hey Martijn

Thanks for the clarification. Now it makes sense.

I saw this feature FLIP-246 is still a WIP and there's no release date yet,
and it actually contains quite some changes in it. We noticed there's a
WIP PR for this change, just wondering if there's any plan in releasing
this feature?

For our current situation, we are subscribing to hundreds of topics, and we
add/remove topics quite often (every few days probably), adding topics
seems to be okay at the moment, but with the current KafkaSource design, if
removing a topic means we need to change the kafka soure id, and restart
with non-restored state, I assume it means we will lose the states of other
topics as well, and because we need to do this quite often, it seems quite
inconvenient to keep restarting the application with non-restored state.

We are thinking of introducing some temporary workaround while waiting for
this dynamic adding/removing topics feature (probably by forking the
flink-connector-kafka and add some custom logic there), just wondering if
there's any direction you can point us if we are to do the work around, or
is there any pre-existing work that we could potentially re-use?

On Thu, Nov 2, 2023 at 3:30 AM Martijn Visser 
wrote:

> Hi,
>
> That's by design: you can't dynamically add and remove topics from an
> existing Flink job that is being restarted from a snapshot. The
> feature you're looking for is being planned as part of FLIP-246 [1]
>
> Best regards,
>
> Martijn
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320
>
>
> On Wed, Nov 1, 2023 at 7:29 AM Emily Li via user 
> wrote:
> >
> > Hey
> >
> > We have a flinkapp which is subscribing to multiple topics, we recently
> upgraded our application from 1.13 to 1.15, which we started to use
> KafkaSource instead of FlinkKafkaConsumer (deprecated).
> >
> > But we noticed some weird issue with KafkaSource after the upgrade, we
> are setting the topics with the kafkaSource builder like this
> >
> > ```
> >
> > KafkaSource
> >
> >   .builder[CustomEvent]
> >
> >   .setBootstrapServers(p.bootstrapServers)
> >
> >   .setGroupId(consumerGroupName)
> >
> >   .setDeserializer(deserializer)
> >
> >   .setTopics(topics)
> > ```
> >
> > And we pass in a list of topics to subscribe, but from time to time we
> will add some new topics or remove some topics (stop consuming them), but
> we noticed that ever since we upgraded to 1.15, when we remove a topic from
> the list, it somehow still consuming the topic (committed offset to the
> already unsubscribed topics, we also have some logs and metrics showing
> that we are still consuming the already removed topic), and from the
> aws.kafka.sum_offset_lag metric, we can also see the removed topic having
> negative lag...
> >
> >
> > And if we delete the topic in kafka, the running flink application will
> crash and throw an error "
> >
> > saying the partition cannot be found (because the topic is already
> deleted from Kafka).
> >
> >
> > We'd like to understand what could have caused this and if this is a bug
> in KafkaSource?
> >
> >
> > When we were in 1.13, this never occurred, we were able to remove topics
> without any issues.
> >
> >
> > We also tried to upgrade to flink 1.17, but the same issue occurred.
>


Re: Issues about removed topics with KafkaSource

2023-11-01 Thread Martijn Visser
Hi,

That's by design: you can't dynamically add and remove topics from an
existing Flink job that is being restarted from a snapshot. The
feature you're looking for is being planned as part of FLIP-246 [1]

Best regards,

Martijn

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217389320


On Wed, Nov 1, 2023 at 7:29 AM Emily Li via user  wrote:
>
> Hey
>
> We have a flinkapp which is subscribing to multiple topics, we recently 
> upgraded our application from 1.13 to 1.15, which we started to use 
> KafkaSource instead of FlinkKafkaConsumer (deprecated).
>
> But we noticed some weird issue with KafkaSource after the upgrade, we are 
> setting the topics with the kafkaSource builder like this
>
> ```
>
> KafkaSource
>
>   .builder[CustomEvent]
>
>   .setBootstrapServers(p.bootstrapServers)
>
>   .setGroupId(consumerGroupName)
>
>   .setDeserializer(deserializer)
>
>   .setTopics(topics)
> ```
>
> And we pass in a list of topics to subscribe, but from time to time we will 
> add some new topics or remove some topics (stop consuming them), but we 
> noticed that ever since we upgraded to 1.15, when we remove a topic from the 
> list, it somehow still consuming the topic (committed offset to the already 
> unsubscribed topics, we also have some logs and metrics showing that we are 
> still consuming the already removed topic), and from the 
> aws.kafka.sum_offset_lag metric, we can also see the removed topic having 
> negative lag...
>
>
> And if we delete the topic in kafka, the running flink application will crash 
> and throw an error "
>
> saying the partition cannot be found (because the topic is already deleted 
> from Kafka).
>
>
> We'd like to understand what could have caused this and if this is a bug in 
> KafkaSource?
>
>
> When we were in 1.13, this never occurred, we were able to remove topics 
> without any issues.
>
>
> We also tried to upgrade to flink 1.17, but the same issue occurred.


Issues about removed topics with KafkaSource

2023-11-01 Thread Emily Li via user
Hey

We have a flinkapp which is subscribing to multiple topics, we recently
upgraded our application from 1.13 to 1.15, which we started to use
KafkaSource instead of FlinkKafkaConsumer (deprecated).

But we noticed some weird issue with KafkaSource after the upgrade, we are
setting the topics with the kafkaSource builder like this

```

KafkaSource

  .builder[CustomEvent]

  .setBootstrapServers(p.bootstrapServers)

  .setGroupId(consumerGroupName)

  .setDeserializer(deserializer)

  .setTopics(topics)
```

And we pass in a list of topics to subscribe, but from time to time we will
add some new topics or remove some topics (stop consuming them), but we
noticed that ever since we upgraded to 1.15, when we remove a topic from
the list, it somehow still consuming the topic (committed offset to the
already unsubscribed topics, we also have some logs and metrics showing
that we are still consuming the already removed topic), and from
the aws.kafka.sum_offset_lag metric, we can also see the removed topic
having negative lag...


And if we delete the topic in kafka, the running flink application will
crash and throw an error "

saying the partition cannot be found (because the topic is already deleted
from Kafka).


We'd like to understand what could have caused this and if this is a bug in
KafkaSource?


When we were in 1.13, this never occurred, we were able to remove topics
without any issues.


We also tried to upgrade to flink 1.17, but the same issue occurred.