Sergey Lemekhov created KAFKA-13519:

             Summary: Same JAAS configuration used for all producers
                 Key: KAFKA-13519
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 3.0.0
            Reporter: Sergey Lemekhov

h3. Problem

Sending messages to more than one Kafka cluster is impossible when using 
instances of {{org.apache.kafka.clients.producer.KafkaProducer}} from 
{{kafka-clients}} Java library with {{SASL_JAAS_CONFIG}} authentication 
Only one {{}} is 
created for all of the clusters 
 map contains only one entry).
h3. How to reproduce

Create two {{KafkaProducer}} instances with the following configuration 
(producers should use different kafka clusters and have different 
{{ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}} setting):
Properties properties = new Properties();
properties.put("security.protocol", "SASL_SSL");
properties.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
AzureAuthCallbackHandler.class); //custom class for handling callbacks. in my 
case it is Azure Event Hubs with Kafka API support

//here custom configuration is set for callback handler settings
""; + tenantId); //azure tenant id
properties.put(AzureAuthCallbackHandler.APP_ID_CONFIG, appId); //azure oauth 
2.0 app id
properties.put(AzureAuthCallbackHandler.APP_SECRET_CONFIG, appSecret); //azure 
oauth 2.0 app secret
Here {{AzureAuthCallbackHandler}} is a custom class which takes care of 
acquiring tokens from Azure. It is configured to fetch tokens from the same 
host that used as {{ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}} setting and class 
instance should be different for each created {{{}KafkaProducer{}}}. However it 
is created only once by the client library (for the only {{{}LoginManager{}}}) 
and used for all producers.

When using both producers for sending messages this leads to a lot of:
[Producer clientId=my-client-id] Error while fetching metadata with correlation 
id 164 : {my-topic=UNKNOWN_TOPIC_OR_PARTITION}
and finally to:
org.apache.kafka.common.errors.TimeoutException: Topic my-topic not present in 
metadata after 60000 ms.
The second producer tries to fetch metadata from the cluster configured for the 
first producer and can't find target topic there.
h3. Workaround

Add a unique jaas config option for each Kafka cluster:
String options = "cluster=" + clusterName; //clusterName should be unique for 
each created KafkaProducer instance
" required " 
+ options + ";");
LoginManagers map uses {{SASL_JAAS_CONFIG}} string as a part of the key so 
adding a meaningless option to the string makes client library create different 
{{LoginManager}} for each {{KafkaProducer}} instance and the problem disappears.
This is done in 
 method: a {{LoginMetadata}} instance is created and configured with 
{{SASL_JAAS_CONFIG}} value and used as the key in 
{{LoginManager.DYNAMIC_INSTANCES}} map.
h3. Suggested solution

Each {{KafkaProducer}} instance should have individual isolated authentication 
handling objects linked to it regardless of their similarities in 
configuration. The {{SASL_LOGIN_CALLBACK_HANDLER_CLASS}} class should be 
instantiated for each producer individually (since its 
method is invoked with producer's configuration which could be different from 
one producer to another).

h3. Additional details
I've attached callback handler and token implementation for reference.

This message was sent by Atlassian Jira

Reply via email to