[
https://issues.apache.org/jira/browse/KAFKA-13422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17489744#comment-17489744
]
Guozhang Wang commented on KAFKA-13422:
---------------------------------------
I'm unfortunately less familiar with security.auth module, [~rsivaram] [~ijuma]
could you please chime in with your thoughts?
> Even if the correct username and password are configured, when ClientBroker
> or KafkaClient tries to establish a SASL connection to ServerBroker, an
> exception is thrown: (Authentication failed: Invalid username or password)
> ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-13422
> URL: https://issues.apache.org/jira/browse/KAFKA-13422
> Project: Kafka
> Issue Type: Bug
> Components: clients, core
> Affects Versions: 2.7.1, 3.0.0
> Reporter: RivenSun
> Priority: Major
> Attachments: CustomerAuthCallbackHandler.java,
> LoginContext_login_debug.png, SaslClientCallbackHandler_handle_debug.png
>
>
>
> h1. Foreword:
> When deploying a Kafka cluster with a higher version (2.7.1), I encountered
> an exception of communication identity authentication failure between
> brokers. In the current latest version 3.0.0, this problem can also be
> reproduced.
> h1. Problem recurring:
> h2. 1)broker Version is 3.0.0
> h3. The content of kafka_server_jaas.conf of each broker is exactly the same,
> the content is as follows:
>
>
> {code:java}
> KafkaServer {
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="admin"
> password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
> user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
> user_alice="alice";
> org.apache.kafka.common.security.scram.ScramLoginModule required
> username="admin_scram"
> password="admin_scram_password";
>
> };
> {code}
>
>
> h3. broker server.properties:
> One of the broker configuration files is provided, and the content of the
> configuration files of other brokers is only different from the localPublicIp
> of advertised.listeners.
>
> {code:java}
> broker.id=1
> broker.rack=us-east-1a
> advertised.listeners=SASL_PLAINTEXT://localPublicIp:9779,SASL_SSL://localPublicIp:9889,INTERNAL_SSL://:9009,PLAIN_PLUGIN_SSL://localPublicIp:9669
> log.dirs=/asyncmq/kafka/data_1,/asyncmq/kafka/data_2
> zookeeper.connect=***
> listeners=SASL_PLAINTEXT://:9779,SASL_SSL://:9889,INTERNAL_SSL://:9009,PLAIN_PLUGIN_SSL://:9669
> listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,PLAIN_PLUGIN_SSL:SASL_SSL
> listener.name.plain_plugin_ssl.plain.sasl.server.callback.handler.class=org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler
> #ssl config
> ssl.keystore.password=***
> ssl.key.password=***
> ssl.truststore.password=***
> ssl.keystore.location=***
> ssl.truststore.location=***
> ssl.client.auth=none
> ssl.endpoint.identification.algorithm=
> #broker communicate config
> #security.inter.broker.protocol=SASL_PLAINTEXT
> inter.broker.listener.name=INTERNAL_SSL
> sasl.mechanism.inter.broker.protocol=PLAIN
> #sasl authentication config
> sasl.kerberos.service.name=kafka
> sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,GSSAPI
> delegation.token.master.key=***
> delegation.token.expiry.time.ms=86400000
> delegation.token.max.lifetime.ms=3153600000000
> {code}
>
>
> Then start all brokers at the same time. Each broker has actually been
> started successfully, but when establishing a connection between the
> controller node and all brokers, the identity authentication has always
> failed. The connection between brokers cannot be established normally,
> causing the entire Kafka cluster to be unable to provide external services.
> h3. The server log keeps printing abnormally like crazy:
> The real ip sensitive information of the broker in the log, I use ******
> instead of here
>
> {code:java}
> [2021-10-29 14:16:19,831] INFO [SocketServer listenerType=ZK_BROKER,
> nodeId=3] Started socket server acceptors and processors
> (kafka.network.SocketServer)
> [2021-10-29 14:16:19,836] INFO Kafka version: 3.0.0
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 14:16:19,836] INFO Kafka commitId: 8cb0a5e9d3441962
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 14:16:19,836] INFO Kafka startTimeMs: 1635516979831
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 14:16:19,837] INFO [KafkaServer id=3] started
> (kafka.server.KafkaServer)
> [2021-10-29 14:16:20,249] INFO [SocketServer listenerType=ZK_BROKER,
> nodeId=3] Failed authentication with /****** (Authentication failed: Invalid
> username or password) (org.apache.kafka.common.network.Selector)
> [2021-10-29 14:16:20,680] INFO [SocketServer listenerType=ZK_BROKER,
> nodeId=3] Failed authentication with /****** (Authentication failed: Invalid
> username or password) (org.apache.kafka.common.network.Selector)
> [2021-10-29 14:16:21,109] INFO [SocketServer listenerType=ZK_BROKER,
> nodeId=3] Failed authentication with /****** (Authentication failed: Invalid
> username or password) (org.apache.kafka.common.network.Selector)
> {code}
>
>
> h2. 2)Try to change the password of the PlainLoginModule communication
> between brokers in kafka_server_jaas.conf
> change is kJTVDziatPgjXG82sFHc4O1EIuewmlvS --> DziatPgjXG82sFHc4O1EIuewmlvS
>
> {code:java}
> KafkaServer {
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="admin"
> password="DziatPgjXG82sFHc4O1EIuewmlvS"
> user_admin="DziatPgjXG82sFHc4O1EIuewmlvS"
> user_alice="alice";
> org.apache.kafka.common.security.scram.ScramLoginModule required
> username="admin_scram"
> password="admin_scram_password";
>
> };
> {code}
>
>
>
> Restart all broker machines and find that connections can be established
> normally between the controller and all brokers, which can be verified by the
> netstat -anp | grep 9009 command
> Real ip sensitive information, I use ****** instead
>
> {code:java}
> [root@ip-10-30-0-64 kafka]# netstat -anp | grep 9009
> tcp6 0 0 :::9009 :::* LISTEN
> 24852/java
> tcp6 0 0 ******:47502 ******:9009 ESTABLISHED
> 24852/java
> tcp6 0 0 ******:9009 ******:41164 ESTABLISHED
> 24852/java
> tcp6 0 0 ******:9009 ******:41168 ESTABLISHED
> 24852/java
> {code}
>
> The entire cluster can provide external services, which can be verified by
> creating a topic through the script bin/kafka-topics.sh.
> h1. Preliminary guess:
> 1)Does the admin password kJTVDziatPgjXG82sFHc4O1EIuewmlvS contain special
> characters not allowed by Kafka password?
> 2)Whether the content of the kafka_server_jaas.conf file does not conform to
> the standard format of the Kafka official website?
> 3)Whether the end newline character of each line in kafka_server_jaas.conf
> does not conform to the newline character of the Linux system?
> After consulting data and analysis, the above conjectures are not correct
> 1)kJTVDziatPgjXG82sFHc4O1EIuewmlvS does not contain special characters not
> allowed by Kafka password
> 2)The content of the kafka_server_jaas.conf file conforms to the standard
> format of Kafka official website, please refer
> to[https://kafka.apache.org/documentation/#security_sasl]
> 3)Open the kafka_server_jaas.conf file through the vim command, and through
> the :set list command, you can see that the end newline character of each
> line is the standard linux system file newline character
>
> So in order to improve the analysis of the reasons, try to think that the log
> keeps outputting Invalid username or password, what is the username&password
> passed by ClientBroker? What is the username&password expected by
> ServerBroker?
>
> h1. sasl.server.callback.handler.class implement
>
> Refer to the
> [sasl.server.callback.handler.class|https://kafka.apache.org/documentation/#brokerconfigs_sasl.server.callback.handler.class]
> parameter and write the implementation class of the interface
> AuthenticateCallbackHandler :CustomerAuthCallbackHandler.
> In fact, the code implementation of CustomerAuthCallbackHandler is almost
> exactly the same as Kafka's own default implementation class
> PlainServerCallbackHandler, except that the log output is temporarily added
> to the native authenticate method. The complete code is included in the
> attachment"CustomerAuthCallbackHandler.java"
>
> {code:java}
> private final static Logger log =
> LoggerFactory.getLogger(CustomerAuthCallbackHandler.class);
> ......
> protected boolean authenticate(String username, char[] password) throws
> IOException {
> if (username == null)
> return false;
> else {
> String expectedPassword =
> JaasContext.configEntryOption(jaasConfigEntries,
> JAAS_USER_PREFIX + username,
> PlainLoginModule.class.getName());
> boolean authenticateSuccess = expectedPassword != null &&
> Utils.isEqualConstantTime(password, expectedPassword.toCharArray());
> log.info("CustomerAuthCallbackHandler authenticate [{}] | user
> [{}] password is [{}] , expectedPassword is [{}] ", authenticateSuccess,
> username, new String(password), expectedPassword);
> return authenticateSuccess;
> }
> }
> {code}
>
>
>
>
> Each broker's configuration file server.properties adds a new line of
> configuration
>
> {code:java}
> listener.name.internal_ssl.plain.sasl.server.callback.handler.class=us.zoom.mq.security.plain.CustomerAuthCallbackHandler
> {code}
>
> Rollback the password of admin in the content of kafka_server_jaas.conf, the
> content is as follows:
>
> {code:java}
> KafkaServer {
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="admin"
> password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
> user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
> user_alice="alice";
> org.apache.kafka.common.security.scram.ScramLoginModule required
> username="admin_scram"
> password="admin_scram_password";
>
> };
> {code}
>
>
> Restart all brokers, you can observe the log:
> The real ip sensitive information of the broker in the log, I use ******
> instead of here
>
> {code:java}
> [2021-10-29 15:31:09,886] INFO [SocketServer listenerType=ZK_BROKER,
> nodeId=3] Started data-plane acceptor and processor(s) for endpoint :
> ListenerName(SASL_PLAINTEXT) (kafka.network.SocketServer)
> [2021-10-29 15:31:09,925] INFO [SocketServer listenerType=ZK_BROKER,
> nodeId=3] Started data-plane acceptor and processor(s) for endpoint :
> ListenerName(PLAIN_PLUGIN_SSL) (kafka.network.SocketServer)
> [2021-10-29 15:31:09,926] INFO [SocketServer listenerType=ZK_BROKER,
> nodeId=3] Started socket server acceptors and processors
> (kafka.network.SocketServer)
> [2021-10-29 15:31:09,932] INFO Kafka version: 3.0.0
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 15:31:09,932] INFO Kafka commitId: 8cb0a5e9d3441962
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 15:31:09,932] INFO Kafka startTimeMs: 1635521469926
> (org.apache.kafka.common.utils.AppInfoParser)
> [2021-10-29 15:31:09,933] INFO [KafkaServer id=3] started
> (kafka.server.KafkaServer)
> [2021-10-29 15:31:10,305] INFO CustomerAuthCallbackHandler authenticate
> [false] | user [admin] password is [admin_scram_password] , expectedPassword
> is [kJTVDziatPgjXG82sFHc4O1EIuewmlvS]
> (us.zoom.mq.security.plain.CustomerAuthCallbackHandler)
> [2021-10-29 15:31:10,306] INFO [SocketServer listenerType=ZK_BROKER,
> nodeId=3] Failed authentication with /****** (Authentication failed: Invalid
> username or password) (org.apache.kafka.common.network.Selector)
> [2021-10-29 15:31:10,734] INFO CustomerAuthCallbackHandler authenticate
> [false] | user [admin] password is [admin_scram_password] , expectedPassword
> is [kJTVDziatPgjXG82sFHc4O1EIuewmlvS]
> (us.zoom.mq.security.plain.CustomerAuthCallbackHandler)
> [2021-10-29 15:31:10,735] INFO [SocketServer listenerType=ZK_BROKER,
> nodeId=3] Failed authentication with /****** (Authentication failed: Invalid
> username or password) (org.apache.kafka.common.network.Selector)
> [2021-10-29 15:31:11,165] INFO CustomerAuthCallbackHandler authenticate
> [false] | user [admin] password is [admin_scram_password] , expectedPassword
> is [kJTVDziatPgjXG82sFHc4O1EIuewmlvS]
> (us.zoom.mq.security.plain.CustomerAuthCallbackHandler)
> [2021-10-29 15:31:11,165] INFO [SocketServer listenerType=ZK_BROKER,
> nodeId=3] Failed authentication with /****** (Authentication failed: Invalid
> username or password) (org.apache.kafka.common.network.Selector)
> [2021-10-29 15:31:11,596] INFO CustomerAuthCallbackHandler authenticate
> [false] | user [admin] password is [admin_scram_password] , expectedPassword
> is [kJTVDziatPgjXG82sFHc4O1EIuewmlvS]
> (us.zoom.mq.security.plain.CustomerAuthCallbackHandler)
> {code}
>
>
> Through the log, we clearly know that the reason for Authentication failed is
> that the username passed by ClientBroker is correct when the connection
> between brokers is established, but the password and expectedPassword do not
> match, and the value of password is configured in the ScramLoginModule in the
> kafka_server_jaas.conf file password.
> At this point of analysis, we can only initially understand that the password
> value passed by ClientBroker is wrong, but we still don't know the reason for
> the wrong password. We can only continue to analyze RC by reading the source
> code of the Kafka server startup process.
>
> h1. Kafka server Startup process source code analysis
> The startup entry is in the core project, the main method of the scala class
> kafka.Kafka
> KafkaServer's startup(), this method completes the initialization of many key
> modules, such as logManager, socketServer, kafkaController, tokenManager,
> groupCoordinator and other modules
> In the startup() method of KafkaServer, and the key pieces of code for this
> problem as follows:
> 1)socketServer.startup(startProcessingRequests = false)
> This method completes the creation of Acceptor and Processors for
> ControlPlane and DataPlane
>
> 2)socketServer.startProcessingRequests(authorizerFutures)
> This method completes the start of Acceptor and Processors for ControlPlane
> and DataPlane
>
> 3)Further trace to the startAcceptorAndProcessors method in SocketServer
> This method will start all Processors threads for each listener.name
> See method startProcessors(processors: Seq[Processor], processorThreadPrefix:
> String)
>
> 4)Analyze configureNewConnections() in the run method of the Processor thread
> class
> There is a line of key code in this
> methodselector.register(connectionId(channel.socket), channel)
> Continue to analyze the code at the bottom
> registerChannel(id, socketChannel, SelectionKey.OP_READ) -->
> KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key) -->
> SaslChannelBuilder.buildChannel(...)
>
> 5)The method buildChannel(...) of SaslChannelBuilder
> In this method, the type of Mode determines whether it is
> buildServerAuthenticator or buildClientAuthenticator
> The most critical difference between the two methods is that the former is
> createSaslServer and the latter is createSaslClient.
> SaslServer is responsible for ServerBroker verifying user passwords, and the
> logic of PlainLoginModule verifying passwords can refer to the default
> implementation class PlainServerCallbackHandler
> SaslClient is responsible for obtaining user passwords on KafkaClient
> So our focus should be buildClientAuthenticator, so the value of the instance
> variable Mode mode of SaslChannelBuilder should be CLIENT.
> So we can go back and trace the construction method :
> SaslChannelBuilder public SaslChannelBuilder(...)
> 6)the construction method public SaslChannelBuilder(...)
> This method is only called in the private static ChannelBuilder create(...)
> method of ChannelBuilders,
> Continue to trace up to static ChannelBuilder clientChannelBuilder(...)
> In fact, traced here, you can find that the caller of the
> clientChannelBuilder(...) method is either ClientBroker or ClientUtils class.
> The latter is for KafkaProducer/KafkaConsumer/KafkaAdminClient
> So in order to further analyze the root cause, I decided to use KafkaProducer
> to simulate ClientBroker request connection. Their underlying mechanisms are
> almost the same, except that some configuration items are only supported by
> ClientBroker or the section of static JAAS configuration is different.
> h1. Source code analysis of KafkaProducer's Sasl authentication process
>
> ClientUtils#clientChannelBuilder(…) → ChannelBuilder#create(...) →
> SaslChannelBuilder#void configure(Map<String, ?> configs)
> The key operations completed in the above methods
> 1)JaasContext load
> If the kafka_Client_jaas.conf file is specified through the
> java.security.auth.login.config environment variable, the method of loading
> into JaasContext is the method getAppConfigurationEntry(String var1) in
> sun.security.provider.ConfigFile class.
>
> {code:java}
> public AppConfigurationEntry[] getAppConfigurationEntry(String var1) {
> return this.spi.engineGetAppConfigurationEntry(var1);
> }
> {code}
>
>
> 2)createClientCallbackHandler(Map<String, ?> configs)
> The default ClientCallbackHandler implementation class is
> SaslClientCallbackHandler. The role of SaslClientCallbackHandler is that
> KafkaClient takes the username and password from the saved authentication
> credentials in Subject.java in the LoginManager of its own Channel. *This is
> particularly critical, and we will analyze it in detail later.*
> 3)Initialization of LoginManager and loading of LoginContext
> Here we directly trace the construction method of LoginManager
> private LoginManager(...) → AbstractLogin#login()
> Then there are two key operations in the AbstractLogin#login() method
>
> {code:java}
> @Override
> public LoginContext login() throws LoginException {
> loginContext = new LoginContext(contextName, null,
> loginCallbackHandler, configuration);
> loginContext.login();
> log.info("Successfully logged in.");
> return loginContext;
> }
> {code}
>
> The role of new LoginContext(...) is
> (1)Get the configuration variables in JaasContext, where all the credentials
> of kafka_Client_jaas.conf are stored
> (2)Complete the initialization of the instance variable moduleStack array
>
> loginContext.login() → invokePriv(LOGIN_METHOD) → invoke(String methodName)
> (1)In these methods
> Updated the Object module field of each element of the moduleStack array
> (2) Obtain all public methods of each type of LoginModule class through
> reflection
>
> {code:java}
> methods = moduleStack[i].module.getClass().getMethods();
> {code}
>
> And get the index of the initialize method in the methods array through the
> INIT_METHOD constant
> Then execute the initialize method of each type of LoginModule class through
> reflection
>
> {code:java}
> methods[mIndex].invoke(moduleStack[i].module, initArgs);
> {code}
>
> This method is very important. In this method invokePriv(LOGIN_METHOD), by
> looping through the moduleStack variables, LoginContext will execute all the
> initialize methods of the LoginModule class you configured, and the
> initialize methods of PlainLoginModule and ScramLoginModule will load the
> current username&password configured in kafka_Client_jaas.conf into the
> Subject of the LoginManager corresponding to the Channel. The data structure
> of the two fields storing username and password in Subject are both
> SecureSet, and in SecureSet, LinkedList is used to store the corresponding
> elements. *So the key point: the order of the elements in the two Credentials
> in the Subject must be added in the order of all LoginModules in
> kafka_Client_jaas.conf*
> 4)Let's go back and analyze the SaslClientCallbackHandler mentioned in the
> second step above
> Where is this class used? In fact, KafkaClient is ready to initiate the
> operation of establishing a connection.
> ClientFactoryImpl#createSaslClient(...) will use SaslClientCallbackHandler.
> The stack of the method is:
> NetworkClient#initiateConnect(Node node, long now)
> -->
> Selector#connect(String id, InetSocketAddress address, int sendBufferSize,
> int receiveBufferSize)
> -->
> Selector#registerChannel(String id, SocketChannel socketChannel, int
> interestedOps)
> -->
> Selector#buildAndAttachKafkaChannel(SocketChannel socketChannel, String id,
> SelectionKey key)
> -->
> SaslChannelBuilder#buildChannel(...) –>
> SaslChannelBuilder#buildClientAuthenticator
> -->
> SaslClientAuthenticator#SaslClient createSaslClient()
> -->
> Sasl.createSaslClient(...)
> -->
> ClientFactoryImpl 的createSaslClient(...) .
> The source code of this method ClientFactoryImpl#createSaslClient(...) is as
> follows:
> {code:java}
> public SaslClient createSaslClient(String[] var1, String var2, String var3,
> String var4, Map<String, ?> var5, CallbackHandler var6) throws SaslException {
> for(int var7 = 0; var7 < var1.length; ++var7) {
> if (var1[var7].equals(myMechs[0]) &&
> PolicyUtils.checkPolicy(mechPolicies[0], var5)) {
> return new ExternalClient(var2);
> }
> Object[] var8;
> if (var1[var7].equals(myMechs[1]) &&
> PolicyUtils.checkPolicy(mechPolicies[1], var5)) {
> var8 = this.getUserInfo("CRAM-MD5", var2, var6);
> return new CramMD5Client((String)var8[0],
> (byte[])((byte[])var8[1]));
> }
> if (var1[var7].equals(myMechs[2]) &&
> PolicyUtils.checkPolicy(mechPolicies[2], var5)) {
> var8 = this.getUserInfo("PLAIN", var2, var6);
> return new PlainClient(var2, (String)var8[0],
> (byte[])((byte[])var8[1]));
> }
> }
> return null;
> }
> {code}
>
>
> You can see the CallbackHandler passed in from the upper layer, which is the
> parameter var6. In fact, it is Kafka's own SaslClientCallbackHandler. Then
> continue to look at the source code of getUserInfo. You can clearly see that
> NameCallback and PasswordCallback are constructed, and the handle method of
> SaslClientCallbackHandler is executed: var3.handle(new Callback[]\{var6,
> var7});
>
> {code:java}
> private Object[] getUserInfo(String var1, String var2, CallbackHandler var3)
> throws SaslException {
> if (var3 == null) {
> throw new SaslException("Callback handler to get
> username/password required");
> } else {
> try {
> String var4 = var1 + " authentication id: ";
> String var5 = var1 + " password: ";
> NameCallback var6 = var2 == null ? new NameCallback(var4) :
> new NameCallback(var4, var2);
> PasswordCallback var7 = new PasswordCallback(var5, false);
> var3.handle(new Callback[]{var6, var7});
> char[] var8 = var7.getPassword();
> byte[] var9;
> if (var8 != null) {
> var9 = (new String(var8)).getBytes("UTF8");
> var7.clearPassword();
> } else {
> var9 = null;
> }
> String var10 = var6.getName();
> return new Object[]{var10, var9};
> } catch (IOException var11) {
> throw new SaslException("Cannot get password", var11);
> } catch (UnsupportedCallbackException var12) {
> throw new SaslException("Cannot get userid/password", var12);
> }
> }
> }
> {code}
>
>
>
> The most important role of SaslClientCallbackHandler is its handle(Callback[]
> callbacks) method, which takes out username and password from Subject
> By analyzing the source code analysis, when Subject takes elements from each
> type of Credentials, *it will change the data structure of the Credentials
> SecureSet into a HashSet*, and then call *HashSet<String>.iterator().next()*
> to get *the first* item of each type of HashSet Elements, as username and
> password in the corresponding Callback
>
> {code:java}
> for (Callback callback : callbacks) {
> if (callback instanceof NameCallback) {
> NameCallback nc = (NameCallback) callback;
> if (subject != null &&
> !subject.getPublicCredentials(String.class).isEmpty()) {
>
> nc.setName(subject.getPublicCredentials(String.class).iterator().next());
> } else
> nc.setName(nc.getDefaultName());
> } else if (callback instanceof PasswordCallback) {
> if (subject != null &&
> !subject.getPrivateCredentials(String.class).isEmpty()) {
> char[] password =
> subject.getPrivateCredentials(String.class).iterator().next().toCharArray();
> ((PasswordCallback) callback).setPassword(password);
> } else {
> String errorMessage = "Could not login: the client is
> being asked for a password, but the Kafka" +
> " client code does not currently support
> obtaining a password from the user.";
> throw new UnsupportedCallbackException(callback,
> errorMessage);
> }
> }
>
> .......
> }
> {code}
>
>
> *The key point is: when KafkaClient takes the username and password from the*
> *Subject, it must be the first element in the HashSet<String>* *of the
> content of each* *Credentials* *element, and the index order of the elements
> in the HashSet<String> depends on the element's* {color:#ff0000}*hash(
> )*{color} *value.*
> h1. KafkaProducer Sasl identity authentication process Debug
>
> h2. Precondition:
> h3. 1)kafka_server_jaas.conf Configuration:
> All the broker machines java.security.auth.login.config use this
> configuration, the broker starts normally, the Kafka cluster is healthy, and
> can provide services to the outside world
>
> {code:java}
> KafkaServer {
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="admin"
> password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
> user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
> user_alice="alice";
> org.apache.kafka.common.security.scram.ScramLoginModule required
> username="admin_scram"
> password="admin_scram_password";
>
> };
> {code}
>
>
> h3. 2)kafkaProducer key Configuration
> {code:java}
> props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "******:9669");
> props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
> System.setProperty("java.security.auth.login.config","******\\kafka_Client_jaas.conf");
> props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
> props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
> "******\\client.truststore.jks");
> props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "******");
> props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
> KafkaProducer<String, String> producer = new KafkaProducer<String,
> String>(props);
> {code}
>
> h3. 3)kafka_Client_jaas.conf File,Simulate the JAAS configuration of
> ClientBroker
>
> {code:java}
> KafkaClient {
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="admin"
> password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
> user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
> user_alice="alice";
> org.apache.kafka.common.security.scram.ScramLoginModule required
> username="admin_scram"
> password="DziatPgjXG82sFHc4O1EIuewmlvS";
>
> };
> {code}
>
>
> h3. Start the Debug for authentication process
> h4. 1)“LoginSucceeded = true” in the login() method of LoginContext; place
> breakpoint in this line code
> Code debug picture see LoginContext_login_debug
>
> We can see: *The order of the elements in the two Credentials in the*
> Subject *must be added in the order of all LoginModules in
> kafka_Client_jaas.conf*
>
> h4. 2)Place breakpoint debugging to handle(Callback[] callbacks) of
> SaslClientCallbackHandler
> Code debug picture see SaslClientCallbackHandler_handle_debug
>
> You can see that the password field character array of PasswordCallback here
> starts with “DziatPgjXG”, which corresponds to the password of
> ScramLoginModule in the kafka_Client_jaas.conf file:
> "DziatPgjXG82sFHc4O1EIuewmlvS"
>
> h4. 3)Cancel all breakpoints and run the KafkaProducer program
> You can see the Producer log
> The real ip sensitive information of the broker in the log, I use ******
> instead of here
> {code:java}
> [main] INFO org.apache.kafka.common.security.authenticator.AbstractLogin -
> Successfully logged in.
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.0.0
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId:
> 8cb0a5e9d3441962
> [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs:
> 1635534803428
> [kafka-producer-network-thread | producer-1] WARN
> org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1]
> Bootstrap broker ******:9669 (id: -3 rack: null) disconnected
> [kafka-producer-network-thread | producer-1] WARN
> org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1]
> Bootstrap broker ******:9669 (id: -1 rack: null) disconnected
> [kafka-producer-network-thread | producer-1] INFO
> org.apache.kafka.common.network.Selector - [Producer clientId=producer-1]
> Failed authentication with ******/****** (Authentication failed: Invalid
> username or password)
> [kafka-producer-network-thread | producer-1] ERROR
> org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1]
> Connection to node -2 (******/******:9669) failed authentication due to:
> Authentication failed: Invalid username or password
> [kafka-producer-network-thread | producer-1] WARN
> org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1]
> Bootstrap broker ******:9669 (id: -2 rack: null) disconnected
> [main] ERROR ProducerTest - the producer has a error:Authentication failed:
> Invalid username or password
> [kafka-producer-network-thread | producer-1] INFO
> org.apache.kafka.common.network.Selector - [Producer clientId=producer-1]
> Failed authentication with ******/****** (Authentication failed: Invalid
> username or password)
> [kafka-producer-network-thread | producer-1] ERROR
> org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1]
> Connection to node -3 (******/******:9669) failed authentication due to:
> Authentication failed: Invalid username or password
> [kafka-producer-network-thread | producer-1] WARN
> org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1]
> Bootstrap broker ******:9669 (id: -3 rack: null) disconnected
> [main] ERROR ProducerTest - the producer has a error:Authentication failed:
> Invalid username or password
> {code}
>
> You can see that the Producer log is also printing exceptions: Authentication
> failed: Invalid username or password
> Because the password of the PlainLoginModule expected by ServerBroker should
> be “kJTVDziatPgjXG82sFHc4O1EIuewmlvS”, but the password on the KafkaProducer
> side was wrong, it was taken as the password in the ScramLoginModule:
> “DziatPgjXG82sFHc4O1EIuewmlvS”
>
>
> h1. Root Cause:
> h2. 1. The Credentials stored in the Subject contain the username&password of
> {color:#ff0000}all LoginModules{color} in kafka_Client_jaas.conf
> The JDK's LoginContext class initialization code is as follows
> {code:java}
> public LoginContext(String name, Subject subject,
> CallbackHandler callbackHandler,
> Configuration config) throws LoginException {
> this.config = config;
> if (config != null) {
> creatorAcc = java.security.AccessController.getContext();
> }
> init(name);
> ....
> }
> {code}
>
>
> When JAAS configuration is configured through
> java.security.auth.login.config, the config type here is actually the
> sun.security.provider.ConfigFile type of the JDK .
> Then when the above init(name); code is executed,
> config.getAppConfigurationEntry(name) will read all the LoginModules in the
> kafka_Client_jaas.conf file again, and use the entries variable to complete
> the assignment of the instance variable moduleStack in the following code.
>
> {code:java}
> // get the LoginModules configured for this application
> AppConfigurationEntry[] entries =
> config.getAppConfigurationEntry(name);
> if (entries == null) {
> if (sm != null && creatorAcc == null) {
> sm.checkPermission(new AuthPermission
> ("createLoginContext." + OTHER));
> }
> entries = config.getAppConfigurationEntry(OTHER);
> if (entries == null) {
> MessageFormat form = new MessageFormat(ResourcesMgr.getString
> ("No.LoginModules.configured.for.name"));
> Object[] source = {name};
> throw new LoginException(form.format(source));
> }
> }
> moduleStack = new ModuleInfo[entries.length];
> {code}
>
> *Eventually, the* Credentials *stored in the* Subject *will contain the
> username & password of all* LoginModules *in kafka_Client_jaas.conf.*
> h2. 2.In the Subject class in JDK, when two kinds of Credentials take
> elements, the order of taking elements may not be the same as the order in
> which Credentials store elements. The order of taking elements depends on the
> order of the {color:#ff0000}hash index{color} of the elements in HashSet.
>
> h1. Suggestion & Solutions:
>
> h2. 1.Modification of JaasContext and JaasConfig class construction methods
> h3.
> 1)When KafkaClient initializes Map<String, JaasContext> jaasContexts, the
> construction method of JaasContext needs to pass in the clientSaslMechanism
> configured by KafkaClient
> And use clientSaslMechanism to filter the entries returned by
> configuration.getAppConfigurationEntry(name). When clientSaslMechanism =
> PLAIN, the final configurationEntries instance variable should only contain
> the PlainLoginModule content in kafka_Client_jaas.conf;When
> clientSaslMechanism =SCRAM-SHA-256, configurationEntries should only contain
> the content of ScramLoginModule
> {code:java}
> public JaasContext(String name, Type type, Configuration configuration,
> Password dynamicJaasConfig) {
> this.name = name;
> this.type = type;
> this.configuration = configuration;
> AppConfigurationEntry[] entries =
> configuration.getAppConfigurationEntry(name);
> if (entries == null)
> throw new IllegalArgumentException("Could not find a '" + name +
> "' entry in this JAAS configuration.");
>
> //Add here the code that uses clientSaslMechanism to verify and
> filter entries
>
> this.configurationEntries = Collections.unmodifiableList(new
> ArrayList<>(Arrays.asList(entries)));
> this.dynamicJaasConfig = dynamicJaasConfig;
> }
> {code}
> The advantages of this are two points:
> (1)For mode == Mode.CLIENT, even if JAAS configuration is configured through
> java.security.auth.login.config, the JaasContext of KafkaClient (including
> KafkaProducer or KafkaConsumer, etc.) can reach the ClientBroker side
> customized configuration
> "[listener.name|http://listener.name/].\{listenerName}.\{saslMechanism}.sasl.jaas.config"
> semantics,See link
> [https://kafka.apache.org/documentation/#security_jaas_broker] .
> To achieve the goal: In Map<String, JaasContext> jaasContexts, the
> configurationEntries in the JaasContext corresponding to each saslMechanism
> only contains the contents of the LoginModule part corresponding to
> saslMechanism.
> (2)For mode == Mode.SERVER,
> SaslChannelBuilder#configure(Map<String, ?> configs)
>
> {code:java}
> public void configure(Map<String, ?> configs) throws KafkaException {
> try {
> this.configs = configs;
> if (mode == Mode.SERVER) {
> createServerCallbackHandlers(configs);
> createConnectionsMaxReauthMsMap(configs);
> } else
> createClientCallbackHandler(configs);
> for (Map.Entry<String, AuthenticateCallbackHandler> entry :
> saslCallbackHandlers.entrySet()) {
> String mechanism = entry.getKey();
> entry.getValue().configure(configs, mechanism,
> jaasContexts.get(mechanism).configurationEntries());
> }
> ......
>
> }
> {code}
> Because we have completed the verification of configurationEntries in
> jaasContexts.get(mechanism), when executing entry.getValue().configure(...)
> method, in particular, PlainServerCallbackHandler executes configure(...),
> The instance variable jaasConfigEntries in PlainServerCallbackHandler will
> become more pure, jaasConfigEntries will no longer contain the content of
> other LoginModule
> h3. 2)JaasConfig Class need to provide a new construction method
> In the construction method of JaasContext, the parameter Configuration
> configuration is uniformly converted into JaasConfig configuration. Because
> of the behavior in the init(String name) method of the LoginContext class of
> the JDK, we cannot change it.
> *
> {code:java}
> public JaasConfig(String loginContextName, List<AppConfigurationEntry>
> configurationEntries) {
> this.loginContextName = loginContextName;
> if (configurationEntries == null || configurationEntries.size() == 0)
> throw new IllegalArgumentException("JAAS config property does not
> contain any login modules");
> this.configEntries = configurationEntries;
> }
> {code}
> Then in the construction method of JaasContext, use the configurationEntries
> after verification and filtering to restructure the configuration
>
> {code:java}
> public JaasContext(String name, Type type, Configuration configuration,
> Password dynamicJaasConfig) {
> this.name = name;
> this.type = type;
> this.configuration = configuration;
> AppConfigurationEntry[] entries =
> configuration.getAppConfigurationEntry(name);
> if (entries == null)
> throw new IllegalArgumentException("Could not find a '" + name +
> "' entry in this JAAS configuration.");
>
> //Add here the code that uses clientSaslMechanism to verify and
> filter entries
>
> this.configurationEntries = Collections.unmodifiableList(new
> ArrayList<>(Arrays.asList(entries)));
>
> if (configuration instanceof JaasConfig)
> this.configuration = configuration;
> else
> this.configuration = new JaasConfig(name, configurationEntries);
>
> this.dynamicJaasConfig = dynamicJaasConfig;
> }
> {code}
>
> h2. 2.Each type of LoginModule in kafka_Client_jaas.conf should only be
> configured once
> For example, in the following configuration, JaasContext is loaded, and an
> exception should be thrown when executing the construction method of
> JaasContext
>
> {code:java}
> KafkaClient {
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="admin"
> password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
> user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
> user_alice="alice";
>
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="admin2"
> password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
> user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
> user_tom="tom";
> org.apache.kafka.common.security.scram.ScramLoginModule required
> username="admin_scram"
> password="DziatPgjXG82sFHc4O1EIuewmlvS";
>
> };
> {code}
>
>
> h2. 3. Discussion for Subject class in JDK
> The Subject class in the JDK, when fetching elements, SecureSet converts
> HashSet. I don't know what the JDK considers. We can try to submit an issue
> to the JDK official. When we can fetch elements, the Set type uses
> SecureSet/LinkedHashSet,
> like SecureSet → SecureSet/LinkedHashSet .
> But I recommend making changes on the upper application side, Kafka level.
> First of all, whether it is Map<String, JaasContext> jaasContexts or
> Map<String, LoginManager> loginManagers variables, the key is the mechanism.
> Therefore, *the value corresponding to each mechanism should be purer and
> should not contain the contents of the LoginModule of other mechanisms*. This
> way, it is also avoided that there are elements greater than 1 in each
> Credentials in the Subject.
> Secondly, because JDK is designing Subject, Credentials may not pay attention
> to the semantics of "*first element*". What the JDK promises is to return
> every element in Credentials to you and allow you to change the returned data.
> See the comments of the getPrivateCredentials method
> {code:java}
> Return a Set of private credentials associated with this Subject that are
> instances or subclasses of the specified Class.
> The caller must have permission to access all of the requested Credentials,
> or a SecurityException will be thrown.
> The returned Set is not backed by this Subject's internal private Credential
> Set. A new Set is created and returned for each method invocation.
> Modifications to the returned Set will not affect the internal private
> Credential Set.
> Params:
> c – the returned Set of private credentials will all be instances of this
> class.
> Type parameters:
> <T> – the type of the class modeled by c
> Returns:
> a Set of private credentials that are instances of the specified Class.
> Throws:
> NullPointerException – if the specified Class is null.
> public <T> Set<T> getPrivateCredentials(Class<T> c)
> {code}
>
> However, when Kafka takes elements from the two Credentials in the Subject,
> it uses the semantics of "*take the first element*", which may conflict with
> the JDK's concept when designing the Subject. Because you should get all the
> elements of each Credentials for identity authentication.
> {code:java}
> subject.getPrivateCredentials(String.class).iterator().next()
> {code}
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)