Sergey Lemekhov created KAFKA-13519:
---------------------------------------

             Summary: Same JAAS configuration used for all producers
                 Key: KAFKA-13519
                 URL: https://issues.apache.org/jira/browse/KAFKA-13519
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 3.0.0
            Reporter: Sergey Lemekhov
         Attachments: AzureAuthCallbackHandler.java, OAuthBearerTokenImpl.java

h3. Problem

Sending messages to more than one Kafka cluster is impossible when using 
instances of {{org.apache.kafka.clients.producer.KafkaProducer}} from 
{{kafka-clients}} Java library with {{SASL_JAAS_CONFIG}} authentication 
configured.
Only one {{org.apache.kafka.common.security.authenticator.LoginManager}} is 
created for all of the clusters 
({{{}org.apache.kafka.common.security.authenticator.LoginManager#DYNAMIC_INSTANCES{}}}
 map contains only one entry).
h3. How to reproduce

Create two {{KafkaProducer}} instances with the following configuration 
(producers should use different kafka clusters and have different 
{{ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}} setting):
{code:java}
Properties properties = new Properties();
properties.put("security.protocol", "SASL_SSL");
properties.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
properties.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, 
AzureAuthCallbackHandler.class); //custom class for handling callbacks. in my 
case it is Azure Event Hubs with Kafka API support
properties.put(SaslConfigs.SASL_JAAS_CONFIG, 
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule 
required;");

//here custom configuration is set for callback handler settings
properties.put(AzureAuthCallbackHandler.AUTHORITY_CONFIG, 
"https://login.microsoftonline.com/"; + tenantId); //azure tenant id
properties.put(AzureAuthCallbackHandler.APP_ID_CONFIG, appId); //azure oauth 
2.0 app id
properties.put(AzureAuthCallbackHandler.APP_SECRET_CONFIG, appSecret); //azure 
oauth 2.0 app secret
{code}
Here {{AzureAuthCallbackHandler}} is a custom class which takes care of 
acquiring tokens from Azure. It is configured to fetch tokens from the same 
host that used as {{ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}} setting and class 
instance should be different for each created {{{}KafkaProducer{}}}. However it 
is created only once by the client library (for the only {{{}LoginManager{}}}) 
and used for all producers.

When using both producers for sending messages this leads to a lot of:
{code:java}
[Producer clientId=my-client-id] Error while fetching metadata with correlation 
id 164 : {my-topic=UNKNOWN_TOPIC_OR_PARTITION}
{code}
and finally to:
{code:java}
org.apache.kafka.common.errors.TimeoutException: Topic my-topic not present in 
metadata after 60000 ms.
{code}
The second producer tries to fetch metadata from the cluster configured for the 
first producer and can't find target topic there.
h3. Workaround

Add a unique jaas config option for each Kafka cluster:
{code:java}
String options = "cluster=" + clusterName; //clusterName should be unique for 
each created KafkaProducer instance
properties.put(SaslConfigs.SASL_JAAS_CONFIG, 
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " 
+ options + ";");
{code}
LoginManagers map uses {{SASL_JAAS_CONFIG}} string as a part of the key so 
adding a meaningless option to the string makes client library create different 
{{LoginManager}} for each {{KafkaProducer}} instance and the problem disappears.
This is done in 
{{org.apache.kafka.common.security.authenticator.LoginManager#acquireLoginManager}}
 method: a {{LoginMetadata}} instance is created and configured with 
{{SASL_JAAS_CONFIG}} value and used as the key in 
{{LoginManager.DYNAMIC_INSTANCES}} map.
h3. Suggested solution

Each {{KafkaProducer}} instance should have individual isolated authentication 
handling objects linked to it regardless of their similarities in 
configuration. The {{SASL_LOGIN_CALLBACK_HANDLER_CLASS}} class should be 
instantiated for each producer individually (since its 
{{org.apache.kafka.common.security.auth.AuthenticateCallbackHandler#configure}} 
method is invoked with producer's configuration which could be different from 
one producer to another).

h3. Additional details
I've attached callback handler and token implementation for reference.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to