[ 
https://issues.apache.org/jira/browse/FLINK-28842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17870916#comment-17870916
 ] 

Rich commented on FLINK-28842:
------------------------------

{quote}In case you experience a warning with a stack trace containing 
javax.management.InstanceAlreadyExistsException: kafka.consumer:[...], you are 
probably trying to register multiple KafkaConsumers with the same client.id. 
The warning indicates that not all available metrics are correctly forwarded to 
the metrics system. You must ensure that a different client.id.prefix for every 
KafkaSource is configured and that no other KafkaConsumer in your job uses the 
same client.id.
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#kafka-consumer-metrics
{quote}
It makes sense to add `client.id.prefix` to the KafkaSink to mitigate issues 
like `javax.management.InstanceAlreadyExistsException`. Aligning with the 
KafkaSource by using `prefix + subTaskId` is logical, and adding an atomic 
counter (`prefix + subTaskId + atomicCounter`) ensures unique client IDs even 
in pooled environments.

I am willing to take on this as an alternative PR if no one else volunteers.

> Add client.id.prefix for the KafkaSink
> --------------------------------------
>
>                 Key: FLINK-28842
>                 URL: https://issues.apache.org/jira/browse/FLINK-28842
>             Project: Flink
>          Issue Type: New Feature
>          Components: Connectors / Kafka
>    Affects Versions: 1.15.1
>            Reporter: Yaroslav Tkachenko
>            Assignee: Yaroslav Tkachenko
>            Priority: Major
>              Labels: pull-request-available, stale-assigned
>
> Currently, KafkaSink doesn't provide a way to configure a client.id.prefix 
> like KafkaSource does. client.id is as important for Kafka Producers, so it 
> makes sense to implement the missing logic for the KafkaSink. 
> A similar implementation that leverages subtaskId for uniqueness can be used 
> here.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to