Francis created FLINK-35283:
-------------------------------
Summary: Add support unique Kafka producer client ids
Key: FLINK-35283
URL: https://issues.apache.org/jira/browse/FLINK-35283
Project: Flink
Issue Type: Improvement
Components: Connectors / Kafka
Reporter: Francis
This issue came out of debuging a warning we're seeing in our Flink logs. We're
running Flink 1.18 and have an application that uses Kafka topics as a source
and a sink. We're running with several tasks. The warning we're seeing in the
logs is:
```
WARN org.apache.kafka.common.utils.AppInfoParser - Error registering AppInfo
mbean
javax.management.InstanceAlreadyExistsException:
kafka.producer:type=app-info,id=kafka producer client id
```
I've spent a bit of time debugging, and it looks like the root cause of this
warning is the Flink `KafkaSink` creating multiple `KafkaWriter`s that, in
turn, create multiple `KafkaProducer`s with the same Kafka producer
`client.id`. Since the value for `client.id` is used when registering the
`AppInfo` MBean — when multiple `KafkaProducer`s with the same `client.id` are
registered we get the above `InstanceAlreadyExistsException`. Since we're
running with several tasks and we get a Kafka producer per task this duplicate
registration exception makes sense to me.
I'm wondering if the fix would be to update the {{KafkaSink.builder}} by adding
a {{setClientIdPrefix}} method, similar to what we have already on the
{{{}KafkaSource.builder{}}}.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)