Hey

We have a flinkapp which is subscribing to multiple topics, we recently
upgraded our application from 1.13 to 1.15, which we started to use
KafkaSource instead of FlinkKafkaConsumer (deprecated).

But we noticed some weird issue with KafkaSource after the upgrade, we are
setting the topics with the kafkaSource builder like this

```

KafkaSource

  .builder[CustomEvent]

  .setBootstrapServers(p.bootstrapServers)

  .setGroupId(consumerGroupName)

  .setDeserializer(deserializer)

  .setTopics(topics)
```

And we pass in a list of topics to subscribe, but from time to time we will
add some new topics or remove some topics (stop consuming them), but we
noticed that ever since we upgraded to 1.15, when we remove a topic from
the list, it somehow still consuming the topic (committed offset to the
already unsubscribed topics, we also have some logs and metrics showing
that we are still consuming the already removed topic), and from
the aws.kafka.sum_offset_lag metric, we can also see the removed topic
having negative lag...


And if we delete the topic in kafka, the running flink application will
crash and throw an error "

saying the partition cannot be found (because the topic is already deleted
from Kafka).


We'd like to understand what could have caused this and if this is a bug in
KafkaSource?


When we were in 1.13, this never occurred, we were able to remove topics
without any issues.


We also tried to upgrade to flink 1.17, but the same issue occurred.

Reply via email to