Francis created FLINK-35283:
-------------------------------

             Summary: Add support unique Kafka producer client ids 
                 Key: FLINK-35283
                 URL: https://issues.apache.org/jira/browse/FLINK-35283
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / Kafka
            Reporter: Francis


This issue came out of debuging a warning we're seeing in our Flink logs. We're 
running Flink 1.18 and have an application that uses Kafka topics as a source 
and a sink. We're running with several tasks. The warning we're seeing in the 
logs is:

```
WARN org.apache.kafka.common.utils.AppInfoParser - Error registering AppInfo 
mbean
javax.management.InstanceAlreadyExistsException: 
kafka.producer:type=app-info,id=kafka producer client id
```

I've spent a bit of time debugging, and it looks like the root cause of this 
warning is the Flink `KafkaSink` creating multiple `KafkaWriter`s that, in 
turn, create multiple `KafkaProducer`s with the same Kafka producer 
`client.id`. Since the value for `client.id` is used when registering the 
`AppInfo` MBean — when multiple `KafkaProducer`s with the same `client.id` are 
registered we get the above `InstanceAlreadyExistsException`. Since we're 
running with several tasks and we get a Kafka producer per task this duplicate 
registration exception makes sense to me.


I'm wondering if the fix would be to update the {{KafkaSink.builder}} by adding 
a {{setClientIdPrefix}} method, similar to what we have already on the 
{{{}KafkaSource.builder{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to