Sergey Lemekhov created KAFKA-13519: ---------------------------------------
Summary: Same JAAS configuration used for all producers Key: KAFKA-13519 URL: https://issues.apache.org/jira/browse/KAFKA-13519 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 3.0.0 Reporter: Sergey Lemekhov Attachments: AzureAuthCallbackHandler.java, OAuthBearerTokenImpl.java h3. Problem Sending messages to more than one Kafka cluster is impossible when using instances of {{org.apache.kafka.clients.producer.KafkaProducer}} from {{kafka-clients}} Java library with {{SASL_JAAS_CONFIG}} authentication configured. Only one {{org.apache.kafka.common.security.authenticator.LoginManager}} is created for all of the clusters ({{{}org.apache.kafka.common.security.authenticator.LoginManager#DYNAMIC_INSTANCES{}}} map contains only one entry). h3. How to reproduce Create two {{KafkaProducer}} instances with the following configuration (producers should use different kafka clusters and have different {{ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}} setting): {code:java} Properties properties = new Properties(); properties.put("security.protocol", "SASL_SSL"); properties.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER"); properties.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, AzureAuthCallbackHandler.class); //custom class for handling callbacks. in my case it is Azure Event Hubs with Kafka API support properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"); //here custom configuration is set for callback handler settings properties.put(AzureAuthCallbackHandler.AUTHORITY_CONFIG, "https://login.microsoftonline.com/" + tenantId); //azure tenant id properties.put(AzureAuthCallbackHandler.APP_ID_CONFIG, appId); //azure oauth 2.0 app id properties.put(AzureAuthCallbackHandler.APP_SECRET_CONFIG, appSecret); //azure oauth 2.0 app secret {code} Here {{AzureAuthCallbackHandler}} is a custom class which takes care of acquiring tokens from Azure. It is configured to fetch tokens from the same host that used as {{ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}} setting and class instance should be different for each created {{{}KafkaProducer{}}}. However it is created only once by the client library (for the only {{{}LoginManager{}}}) and used for all producers. When using both producers for sending messages this leads to a lot of: {code:java} [Producer clientId=my-client-id] Error while fetching metadata with correlation id 164 : {my-topic=UNKNOWN_TOPIC_OR_PARTITION} {code} and finally to: {code:java} org.apache.kafka.common.errors.TimeoutException: Topic my-topic not present in metadata after 60000 ms. {code} The second producer tries to fetch metadata from the cluster configured for the first producer and can't find target topic there. h3. Workaround Add a unique jaas config option for each Kafka cluster: {code:java} String options = "cluster=" + clusterName; //clusterName should be unique for each created KafkaProducer instance properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " + options + ";"); {code} LoginManagers map uses {{SASL_JAAS_CONFIG}} string as a part of the key so adding a meaningless option to the string makes client library create different {{LoginManager}} for each {{KafkaProducer}} instance and the problem disappears. This is done in {{org.apache.kafka.common.security.authenticator.LoginManager#acquireLoginManager}} method: a {{LoginMetadata}} instance is created and configured with {{SASL_JAAS_CONFIG}} value and used as the key in {{LoginManager.DYNAMIC_INSTANCES}} map. h3. Suggested solution Each {{KafkaProducer}} instance should have individual isolated authentication handling objects linked to it regardless of their similarities in configuration. The {{SASL_LOGIN_CALLBACK_HANDLER_CLASS}} class should be instantiated for each producer individually (since its {{org.apache.kafka.common.security.auth.AuthenticateCallbackHandler#configure}} method is invoked with producer's configuration which could be different from one producer to another). h3. Additional details I've attached callback handler and token implementation for reference. -- This message was sent by Atlassian Jira (v8.20.1#820001)