[ https://issues.apache.org/jira/browse/KAFKA-13422?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17490622#comment-17490622 ]
RivenSun commented on KAFKA-13422: ---------------------------------- [~guozhang] Thank you for your reply [~rsivaram] [~ijuma] could you give any suggestions? Thanks. > Even if the correct username and password are configured, when ClientBroker > or KafkaClient tries to establish a SASL connection to ServerBroker, an > exception is thrown: (Authentication failed: Invalid username or password) > ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ > > Key: KAFKA-13422 > URL: https://issues.apache.org/jira/browse/KAFKA-13422 > Project: Kafka > Issue Type: Bug > Components: clients, core > Affects Versions: 2.7.1, 3.0.0 > Reporter: RivenSun > Priority: Major > Attachments: CustomerAuthCallbackHandler.java, > LoginContext_login_debug.png, SaslClientCallbackHandler_handle_debug.png > > > > h1. Foreword: > When deploying a Kafka cluster with a higher version (2.7.1), I encountered > an exception of communication identity authentication failure between > brokers. In the current latest version 3.0.0, this problem can also be > reproduced. > h1. Problem recurring: > h2. 1)broker Version is 3.0.0 > h3. The content of kafka_server_jaas.conf of each broker is exactly the same, > the content is as follows: > > > {code:java} > KafkaServer { > org.apache.kafka.common.security.plain.PlainLoginModule required > username="admin" > password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS" > user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS" > user_alice="alice"; > org.apache.kafka.common.security.scram.ScramLoginModule required > username="admin_scram" > password="admin_scram_password"; > > }; > {code} > > > h3. broker server.properties: > One of the broker configuration files is provided, and the content of the > configuration files of other brokers is only different from the localPublicIp > of advertised.listeners. > > {code:java} > broker.id=1 > broker.rack=us-east-1a > advertised.listeners=SASL_PLAINTEXT://localPublicIp:9779,SASL_SSL://localPublicIp:9889,INTERNAL_SSL://:9009,PLAIN_PLUGIN_SSL://localPublicIp:9669 > log.dirs=/asyncmq/kafka/data_1,/asyncmq/kafka/data_2 > zookeeper.connect=*** > listeners=SASL_PLAINTEXT://:9779,SASL_SSL://:9889,INTERNAL_SSL://:9009,PLAIN_PLUGIN_SSL://:9669 > listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,PLAIN_PLUGIN_SSL:SASL_SSL > listener.name.plain_plugin_ssl.plain.sasl.server.callback.handler.class=org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler > #ssl config > ssl.keystore.password=*** > ssl.key.password=*** > ssl.truststore.password=*** > ssl.keystore.location=*** > ssl.truststore.location=*** > ssl.client.auth=none > ssl.endpoint.identification.algorithm= > #broker communicate config > #security.inter.broker.protocol=SASL_PLAINTEXT > inter.broker.listener.name=INTERNAL_SSL > sasl.mechanism.inter.broker.protocol=PLAIN > #sasl authentication config > sasl.kerberos.service.name=kafka > sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,GSSAPI > delegation.token.master.key=*** > delegation.token.expiry.time.ms=86400000 > delegation.token.max.lifetime.ms=3153600000000 > {code} > > > Then start all brokers at the same time. Each broker has actually been > started successfully, but when establishing a connection between the > controller node and all brokers, the identity authentication has always > failed. The connection between brokers cannot be established normally, > causing the entire Kafka cluster to be unable to provide external services. > h3. The server log keeps printing abnormally like crazy: > The real ip sensitive information of the broker in the log, I use ****** > instead of here > > {code:java} > [2021-10-29 14:16:19,831] INFO [SocketServer listenerType=ZK_BROKER, > nodeId=3] Started socket server acceptors and processors > (kafka.network.SocketServer) > [2021-10-29 14:16:19,836] INFO Kafka version: 3.0.0 > (org.apache.kafka.common.utils.AppInfoParser) > [2021-10-29 14:16:19,836] INFO Kafka commitId: 8cb0a5e9d3441962 > (org.apache.kafka.common.utils.AppInfoParser) > [2021-10-29 14:16:19,836] INFO Kafka startTimeMs: 1635516979831 > (org.apache.kafka.common.utils.AppInfoParser) > [2021-10-29 14:16:19,837] INFO [KafkaServer id=3] started > (kafka.server.KafkaServer) > [2021-10-29 14:16:20,249] INFO [SocketServer listenerType=ZK_BROKER, > nodeId=3] Failed authentication with /****** (Authentication failed: Invalid > username or password) (org.apache.kafka.common.network.Selector) > [2021-10-29 14:16:20,680] INFO [SocketServer listenerType=ZK_BROKER, > nodeId=3] Failed authentication with /****** (Authentication failed: Invalid > username or password) (org.apache.kafka.common.network.Selector) > [2021-10-29 14:16:21,109] INFO [SocketServer listenerType=ZK_BROKER, > nodeId=3] Failed authentication with /****** (Authentication failed: Invalid > username or password) (org.apache.kafka.common.network.Selector) > {code} > > > h2. 2)Try to change the password of the PlainLoginModule communication > between brokers in kafka_server_jaas.conf > change is kJTVDziatPgjXG82sFHc4O1EIuewmlvS --> DziatPgjXG82sFHc4O1EIuewmlvS > > {code:java} > KafkaServer { > org.apache.kafka.common.security.plain.PlainLoginModule required > username="admin" > password="DziatPgjXG82sFHc4O1EIuewmlvS" > user_admin="DziatPgjXG82sFHc4O1EIuewmlvS" > user_alice="alice"; > org.apache.kafka.common.security.scram.ScramLoginModule required > username="admin_scram" > password="admin_scram_password"; > > }; > {code} > > > > Restart all broker machines and find that connections can be established > normally between the controller and all brokers, which can be verified by the > netstat -anp | grep 9009 command > Real ip sensitive information, I use ****** instead > > {code:java} > [root@ip-10-30-0-64 kafka]# netstat -anp | grep 9009 > tcp6 0 0 :::9009 :::* LISTEN > 24852/java > tcp6 0 0 ******:47502 ******:9009 ESTABLISHED > 24852/java > tcp6 0 0 ******:9009 ******:41164 ESTABLISHED > 24852/java > tcp6 0 0 ******:9009 ******:41168 ESTABLISHED > 24852/java > {code} > > The entire cluster can provide external services, which can be verified by > creating a topic through the script bin/kafka-topics.sh. > h1. Preliminary guess: > 1)Does the admin password kJTVDziatPgjXG82sFHc4O1EIuewmlvS contain special > characters not allowed by Kafka password? > 2)Whether the content of the kafka_server_jaas.conf file does not conform to > the standard format of the Kafka official website? > 3)Whether the end newline character of each line in kafka_server_jaas.conf > does not conform to the newline character of the Linux system? > After consulting data and analysis, the above conjectures are not correct > 1)kJTVDziatPgjXG82sFHc4O1EIuewmlvS does not contain special characters not > allowed by Kafka password > 2)The content of the kafka_server_jaas.conf file conforms to the standard > format of Kafka official website, please refer > to[https://kafka.apache.org/documentation/#security_sasl] > 3)Open the kafka_server_jaas.conf file through the vim command, and through > the :set list command, you can see that the end newline character of each > line is the standard linux system file newline character > > So in order to improve the analysis of the reasons, try to think that the log > keeps outputting Invalid username or password, what is the username&password > passed by ClientBroker? What is the username&password expected by > ServerBroker? > > h1. sasl.server.callback.handler.class implement > > Refer to the > [sasl.server.callback.handler.class|https://kafka.apache.org/documentation/#brokerconfigs_sasl.server.callback.handler.class] > parameter and write the implementation class of the interface > AuthenticateCallbackHandler :CustomerAuthCallbackHandler. > In fact, the code implementation of CustomerAuthCallbackHandler is almost > exactly the same as Kafka's own default implementation class > PlainServerCallbackHandler, except that the log output is temporarily added > to the native authenticate method. The complete code is included in the > attachment"CustomerAuthCallbackHandler.java" > > {code:java} > private final static Logger log = > LoggerFactory.getLogger(CustomerAuthCallbackHandler.class); > ...... > protected boolean authenticate(String username, char[] password) throws > IOException { > if (username == null) > return false; > else { > String expectedPassword = > JaasContext.configEntryOption(jaasConfigEntries, > JAAS_USER_PREFIX + username, > PlainLoginModule.class.getName()); > boolean authenticateSuccess = expectedPassword != null && > Utils.isEqualConstantTime(password, expectedPassword.toCharArray()); > log.info("CustomerAuthCallbackHandler authenticate [{}] | user > [{}] password is [{}] , expectedPassword is [{}] ", authenticateSuccess, > username, new String(password), expectedPassword); > return authenticateSuccess; > } > } > {code} > > > > > Each broker's configuration file server.properties adds a new line of > configuration > > {code:java} > listener.name.internal_ssl.plain.sasl.server.callback.handler.class=us.zoom.mq.security.plain.CustomerAuthCallbackHandler > {code} > > Rollback the password of admin in the content of kafka_server_jaas.conf, the > content is as follows: > > {code:java} > KafkaServer { > org.apache.kafka.common.security.plain.PlainLoginModule required > username="admin" > password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS" > user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS" > user_alice="alice"; > org.apache.kafka.common.security.scram.ScramLoginModule required > username="admin_scram" > password="admin_scram_password"; > > }; > {code} > > > Restart all brokers, you can observe the log: > The real ip sensitive information of the broker in the log, I use ****** > instead of here > > {code:java} > [2021-10-29 15:31:09,886] INFO [SocketServer listenerType=ZK_BROKER, > nodeId=3] Started data-plane acceptor and processor(s) for endpoint : > ListenerName(SASL_PLAINTEXT) (kafka.network.SocketServer) > [2021-10-29 15:31:09,925] INFO [SocketServer listenerType=ZK_BROKER, > nodeId=3] Started data-plane acceptor and processor(s) for endpoint : > ListenerName(PLAIN_PLUGIN_SSL) (kafka.network.SocketServer) > [2021-10-29 15:31:09,926] INFO [SocketServer listenerType=ZK_BROKER, > nodeId=3] Started socket server acceptors and processors > (kafka.network.SocketServer) > [2021-10-29 15:31:09,932] INFO Kafka version: 3.0.0 > (org.apache.kafka.common.utils.AppInfoParser) > [2021-10-29 15:31:09,932] INFO Kafka commitId: 8cb0a5e9d3441962 > (org.apache.kafka.common.utils.AppInfoParser) > [2021-10-29 15:31:09,932] INFO Kafka startTimeMs: 1635521469926 > (org.apache.kafka.common.utils.AppInfoParser) > [2021-10-29 15:31:09,933] INFO [KafkaServer id=3] started > (kafka.server.KafkaServer) > [2021-10-29 15:31:10,305] INFO CustomerAuthCallbackHandler authenticate > [false] | user [admin] password is [admin_scram_password] , expectedPassword > is [kJTVDziatPgjXG82sFHc4O1EIuewmlvS] > (us.zoom.mq.security.plain.CustomerAuthCallbackHandler) > [2021-10-29 15:31:10,306] INFO [SocketServer listenerType=ZK_BROKER, > nodeId=3] Failed authentication with /****** (Authentication failed: Invalid > username or password) (org.apache.kafka.common.network.Selector) > [2021-10-29 15:31:10,734] INFO CustomerAuthCallbackHandler authenticate > [false] | user [admin] password is [admin_scram_password] , expectedPassword > is [kJTVDziatPgjXG82sFHc4O1EIuewmlvS] > (us.zoom.mq.security.plain.CustomerAuthCallbackHandler) > [2021-10-29 15:31:10,735] INFO [SocketServer listenerType=ZK_BROKER, > nodeId=3] Failed authentication with /****** (Authentication failed: Invalid > username or password) (org.apache.kafka.common.network.Selector) > [2021-10-29 15:31:11,165] INFO CustomerAuthCallbackHandler authenticate > [false] | user [admin] password is [admin_scram_password] , expectedPassword > is [kJTVDziatPgjXG82sFHc4O1EIuewmlvS] > (us.zoom.mq.security.plain.CustomerAuthCallbackHandler) > [2021-10-29 15:31:11,165] INFO [SocketServer listenerType=ZK_BROKER, > nodeId=3] Failed authentication with /****** (Authentication failed: Invalid > username or password) (org.apache.kafka.common.network.Selector) > [2021-10-29 15:31:11,596] INFO CustomerAuthCallbackHandler authenticate > [false] | user [admin] password is [admin_scram_password] , expectedPassword > is [kJTVDziatPgjXG82sFHc4O1EIuewmlvS] > (us.zoom.mq.security.plain.CustomerAuthCallbackHandler) > {code} > > > Through the log, we clearly know that the reason for Authentication failed is > that the username passed by ClientBroker is correct when the connection > between brokers is established, but the password and expectedPassword do not > match, and the value of password is configured in the ScramLoginModule in the > kafka_server_jaas.conf file password. > At this point of analysis, we can only initially understand that the password > value passed by ClientBroker is wrong, but we still don't know the reason for > the wrong password. We can only continue to analyze RC by reading the source > code of the Kafka server startup process. > > h1. Kafka server Startup process source code analysis > The startup entry is in the core project, the main method of the scala class > kafka.Kafka > KafkaServer's startup(), this method completes the initialization of many key > modules, such as logManager, socketServer, kafkaController, tokenManager, > groupCoordinator and other modules > In the startup() method of KafkaServer, and the key pieces of code for this > problem as follows: > 1)socketServer.startup(startProcessingRequests = false) > This method completes the creation of Acceptor and Processors for > ControlPlane and DataPlane > > 2)socketServer.startProcessingRequests(authorizerFutures) > This method completes the start of Acceptor and Processors for ControlPlane > and DataPlane > > 3)Further trace to the startAcceptorAndProcessors method in SocketServer > This method will start all Processors threads for each listener.name > See method startProcessors(processors: Seq[Processor], processorThreadPrefix: > String) > > 4)Analyze configureNewConnections() in the run method of the Processor thread > class > There is a line of key code in this > methodselector.register(connectionId(channel.socket), channel) > Continue to analyze the code at the bottom > registerChannel(id, socketChannel, SelectionKey.OP_READ) --> > KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key) --> > SaslChannelBuilder.buildChannel(...) > > 5)The method buildChannel(...) of SaslChannelBuilder > In this method, the type of Mode determines whether it is > buildServerAuthenticator or buildClientAuthenticator > The most critical difference between the two methods is that the former is > createSaslServer and the latter is createSaslClient. > SaslServer is responsible for ServerBroker verifying user passwords, and the > logic of PlainLoginModule verifying passwords can refer to the default > implementation class PlainServerCallbackHandler > SaslClient is responsible for obtaining user passwords on KafkaClient > So our focus should be buildClientAuthenticator, so the value of the instance > variable Mode mode of SaslChannelBuilder should be CLIENT. > So we can go back and trace the construction method : > SaslChannelBuilder public SaslChannelBuilder(...) > 6)the construction method public SaslChannelBuilder(...) > This method is only called in the private static ChannelBuilder create(...) > method of ChannelBuilders, > Continue to trace up to static ChannelBuilder clientChannelBuilder(...) > In fact, traced here, you can find that the caller of the > clientChannelBuilder(...) method is either ClientBroker or ClientUtils class. > The latter is for KafkaProducer/KafkaConsumer/KafkaAdminClient > So in order to further analyze the root cause, I decided to use KafkaProducer > to simulate ClientBroker request connection. Their underlying mechanisms are > almost the same, except that some configuration items are only supported by > ClientBroker or the section of static JAAS configuration is different. > h1. Source code analysis of KafkaProducer's Sasl authentication process > > ClientUtils#clientChannelBuilder(…) → ChannelBuilder#create(...) → > SaslChannelBuilder#void configure(Map<String, ?> configs) > The key operations completed in the above methods > 1)JaasContext load > If the kafka_Client_jaas.conf file is specified through the > java.security.auth.login.config environment variable, the method of loading > into JaasContext is the method getAppConfigurationEntry(String var1) in > sun.security.provider.ConfigFile class. > > {code:java} > public AppConfigurationEntry[] getAppConfigurationEntry(String var1) { > return this.spi.engineGetAppConfigurationEntry(var1); > } > {code} > > > 2)createClientCallbackHandler(Map<String, ?> configs) > The default ClientCallbackHandler implementation class is > SaslClientCallbackHandler. The role of SaslClientCallbackHandler is that > KafkaClient takes the username and password from the saved authentication > credentials in Subject.java in the LoginManager of its own Channel. *This is > particularly critical, and we will analyze it in detail later.* > 3)Initialization of LoginManager and loading of LoginContext > Here we directly trace the construction method of LoginManager > private LoginManager(...) → AbstractLogin#login() > Then there are two key operations in the AbstractLogin#login() method > > {code:java} > @Override > public LoginContext login() throws LoginException { > loginContext = new LoginContext(contextName, null, > loginCallbackHandler, configuration); > loginContext.login(); > log.info("Successfully logged in."); > return loginContext; > } > {code} > > The role of new LoginContext(...) is > (1)Get the configuration variables in JaasContext, where all the credentials > of kafka_Client_jaas.conf are stored > (2)Complete the initialization of the instance variable moduleStack array > > loginContext.login() → invokePriv(LOGIN_METHOD) → invoke(String methodName) > (1)In these methods > Updated the Object module field of each element of the moduleStack array > (2) Obtain all public methods of each type of LoginModule class through > reflection > > {code:java} > methods = moduleStack[i].module.getClass().getMethods(); > {code} > > And get the index of the initialize method in the methods array through the > INIT_METHOD constant > Then execute the initialize method of each type of LoginModule class through > reflection > > {code:java} > methods[mIndex].invoke(moduleStack[i].module, initArgs); > {code} > > This method is very important. In this method invokePriv(LOGIN_METHOD), by > looping through the moduleStack variables, LoginContext will execute all the > initialize methods of the LoginModule class you configured, and the > initialize methods of PlainLoginModule and ScramLoginModule will load the > current username&password configured in kafka_Client_jaas.conf into the > Subject of the LoginManager corresponding to the Channel. The data structure > of the two fields storing username and password in Subject are both > SecureSet, and in SecureSet, LinkedList is used to store the corresponding > elements. *So the key point: the order of the elements in the two Credentials > in the Subject must be added in the order of all LoginModules in > kafka_Client_jaas.conf* > 4)Let's go back and analyze the SaslClientCallbackHandler mentioned in the > second step above > Where is this class used? In fact, KafkaClient is ready to initiate the > operation of establishing a connection. > ClientFactoryImpl#createSaslClient(...) will use SaslClientCallbackHandler. > The stack of the method is: > NetworkClient#initiateConnect(Node node, long now) > --> > Selector#connect(String id, InetSocketAddress address, int sendBufferSize, > int receiveBufferSize) > --> > Selector#registerChannel(String id, SocketChannel socketChannel, int > interestedOps) > --> > Selector#buildAndAttachKafkaChannel(SocketChannel socketChannel, String id, > SelectionKey key) > --> > SaslChannelBuilder#buildChannel(...) –> > SaslChannelBuilder#buildClientAuthenticator > --> > SaslClientAuthenticator#SaslClient createSaslClient() > --> > Sasl.createSaslClient(...) > --> > ClientFactoryImpl 的createSaslClient(...) . > The source code of this method ClientFactoryImpl#createSaslClient(...) is as > follows: > {code:java} > public SaslClient createSaslClient(String[] var1, String var2, String var3, > String var4, Map<String, ?> var5, CallbackHandler var6) throws SaslException { > for(int var7 = 0; var7 < var1.length; ++var7) { > if (var1[var7].equals(myMechs[0]) && > PolicyUtils.checkPolicy(mechPolicies[0], var5)) { > return new ExternalClient(var2); > } > Object[] var8; > if (var1[var7].equals(myMechs[1]) && > PolicyUtils.checkPolicy(mechPolicies[1], var5)) { > var8 = this.getUserInfo("CRAM-MD5", var2, var6); > return new CramMD5Client((String)var8[0], > (byte[])((byte[])var8[1])); > } > if (var1[var7].equals(myMechs[2]) && > PolicyUtils.checkPolicy(mechPolicies[2], var5)) { > var8 = this.getUserInfo("PLAIN", var2, var6); > return new PlainClient(var2, (String)var8[0], > (byte[])((byte[])var8[1])); > } > } > return null; > } > {code} > > > You can see the CallbackHandler passed in from the upper layer, which is the > parameter var6. In fact, it is Kafka's own SaslClientCallbackHandler. Then > continue to look at the source code of getUserInfo. You can clearly see that > NameCallback and PasswordCallback are constructed, and the handle method of > SaslClientCallbackHandler is executed: var3.handle(new Callback[]\{var6, > var7}); > > {code:java} > private Object[] getUserInfo(String var1, String var2, CallbackHandler var3) > throws SaslException { > if (var3 == null) { > throw new SaslException("Callback handler to get > username/password required"); > } else { > try { > String var4 = var1 + " authentication id: "; > String var5 = var1 + " password: "; > NameCallback var6 = var2 == null ? new NameCallback(var4) : > new NameCallback(var4, var2); > PasswordCallback var7 = new PasswordCallback(var5, false); > var3.handle(new Callback[]{var6, var7}); > char[] var8 = var7.getPassword(); > byte[] var9; > if (var8 != null) { > var9 = (new String(var8)).getBytes("UTF8"); > var7.clearPassword(); > } else { > var9 = null; > } > String var10 = var6.getName(); > return new Object[]{var10, var9}; > } catch (IOException var11) { > throw new SaslException("Cannot get password", var11); > } catch (UnsupportedCallbackException var12) { > throw new SaslException("Cannot get userid/password", var12); > } > } > } > {code} > > > > The most important role of SaslClientCallbackHandler is its handle(Callback[] > callbacks) method, which takes out username and password from Subject > By analyzing the source code analysis, when Subject takes elements from each > type of Credentials, *it will change the data structure of the Credentials > SecureSet into a HashSet*, and then call *HashSet<String>.iterator().next()* > to get *the first* item of each type of HashSet Elements, as username and > password in the corresponding Callback > > {code:java} > for (Callback callback : callbacks) { > if (callback instanceof NameCallback) { > NameCallback nc = (NameCallback) callback; > if (subject != null && > !subject.getPublicCredentials(String.class).isEmpty()) { > > nc.setName(subject.getPublicCredentials(String.class).iterator().next()); > } else > nc.setName(nc.getDefaultName()); > } else if (callback instanceof PasswordCallback) { > if (subject != null && > !subject.getPrivateCredentials(String.class).isEmpty()) { > char[] password = > subject.getPrivateCredentials(String.class).iterator().next().toCharArray(); > ((PasswordCallback) callback).setPassword(password); > } else { > String errorMessage = "Could not login: the client is > being asked for a password, but the Kafka" + > " client code does not currently support > obtaining a password from the user."; > throw new UnsupportedCallbackException(callback, > errorMessage); > } > } > > ....... > } > {code} > > > *The key point is: when KafkaClient takes the username and password from the* > *Subject, it must be the first element in the HashSet<String>* *of the > content of each* *Credentials* *element, and the index order of the elements > in the HashSet<String> depends on the element's* {color:#ff0000}*hash( > )*{color} *value.* > h1. KafkaProducer Sasl identity authentication process Debug > > h2. Precondition: > h3. 1)kafka_server_jaas.conf Configuration: > All the broker machines java.security.auth.login.config use this > configuration, the broker starts normally, the Kafka cluster is healthy, and > can provide services to the outside world > > {code:java} > KafkaServer { > org.apache.kafka.common.security.plain.PlainLoginModule required > username="admin" > password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS" > user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS" > user_alice="alice"; > org.apache.kafka.common.security.scram.ScramLoginModule required > username="admin_scram" > password="admin_scram_password"; > > }; > {code} > > > h3. 2)kafkaProducer key Configuration > {code:java} > props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "******:9669"); > props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); > System.setProperty("java.security.auth.login.config","******\\kafka_Client_jaas.conf"); > props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); > props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, > "******\\client.truststore.jks"); > props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "******"); > props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); > KafkaProducer<String, String> producer = new KafkaProducer<String, > String>(props); > {code} > > h3. 3)kafka_Client_jaas.conf File,Simulate the JAAS configuration of > ClientBroker > > {code:java} > KafkaClient { > org.apache.kafka.common.security.plain.PlainLoginModule required > username="admin" > password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS" > user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS" > user_alice="alice"; > org.apache.kafka.common.security.scram.ScramLoginModule required > username="admin_scram" > password="DziatPgjXG82sFHc4O1EIuewmlvS"; > > }; > {code} > > > h3. Start the Debug for authentication process > h4. 1)“LoginSucceeded = true” in the login() method of LoginContext; place > breakpoint in this line code > Code debug picture see LoginContext_login_debug > > We can see: *The order of the elements in the two Credentials in the* > Subject *must be added in the order of all LoginModules in > kafka_Client_jaas.conf* > > h4. 2)Place breakpoint debugging to handle(Callback[] callbacks) of > SaslClientCallbackHandler > Code debug picture see SaslClientCallbackHandler_handle_debug > > You can see that the password field character array of PasswordCallback here > starts with “DziatPgjXG”, which corresponds to the password of > ScramLoginModule in the kafka_Client_jaas.conf file: > "DziatPgjXG82sFHc4O1EIuewmlvS" > > h4. 3)Cancel all breakpoints and run the KafkaProducer program > You can see the Producer log > The real ip sensitive information of the broker in the log, I use ****** > instead of here > {code:java} > [main] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - > Successfully logged in. > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.0.0 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: > 8cb0a5e9d3441962 > [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: > 1635534803428 > [kafka-producer-network-thread | producer-1] WARN > org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] > Bootstrap broker ******:9669 (id: -3 rack: null) disconnected > [kafka-producer-network-thread | producer-1] WARN > org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] > Bootstrap broker ******:9669 (id: -1 rack: null) disconnected > [kafka-producer-network-thread | producer-1] INFO > org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] > Failed authentication with ******/****** (Authentication failed: Invalid > username or password) > [kafka-producer-network-thread | producer-1] ERROR > org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] > Connection to node -2 (******/******:9669) failed authentication due to: > Authentication failed: Invalid username or password > [kafka-producer-network-thread | producer-1] WARN > org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] > Bootstrap broker ******:9669 (id: -2 rack: null) disconnected > [main] ERROR ProducerTest - the producer has a error:Authentication failed: > Invalid username or password > [kafka-producer-network-thread | producer-1] INFO > org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] > Failed authentication with ******/****** (Authentication failed: Invalid > username or password) > [kafka-producer-network-thread | producer-1] ERROR > org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] > Connection to node -3 (******/******:9669) failed authentication due to: > Authentication failed: Invalid username or password > [kafka-producer-network-thread | producer-1] WARN > org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] > Bootstrap broker ******:9669 (id: -3 rack: null) disconnected > [main] ERROR ProducerTest - the producer has a error:Authentication failed: > Invalid username or password > {code} > > You can see that the Producer log is also printing exceptions: Authentication > failed: Invalid username or password > Because the password of the PlainLoginModule expected by ServerBroker should > be “kJTVDziatPgjXG82sFHc4O1EIuewmlvS”, but the password on the KafkaProducer > side was wrong, it was taken as the password in the ScramLoginModule: > “DziatPgjXG82sFHc4O1EIuewmlvS” > > > h1. Root Cause: > h2. 1. The Credentials stored in the Subject contain the username&password of > {color:#ff0000}all LoginModules{color} in kafka_Client_jaas.conf > The JDK's LoginContext class initialization code is as follows > {code:java} > public LoginContext(String name, Subject subject, > CallbackHandler callbackHandler, > Configuration config) throws LoginException { > this.config = config; > if (config != null) { > creatorAcc = java.security.AccessController.getContext(); > } > init(name); > .... > } > {code} > > > When JAAS configuration is configured through > java.security.auth.login.config, the config type here is actually the > sun.security.provider.ConfigFile type of the JDK . > Then when the above init(name); code is executed, > config.getAppConfigurationEntry(name) will read all the LoginModules in the > kafka_Client_jaas.conf file again, and use the entries variable to complete > the assignment of the instance variable moduleStack in the following code. > > {code:java} > // get the LoginModules configured for this application > AppConfigurationEntry[] entries = > config.getAppConfigurationEntry(name); > if (entries == null) { > if (sm != null && creatorAcc == null) { > sm.checkPermission(new AuthPermission > ("createLoginContext." + OTHER)); > } > entries = config.getAppConfigurationEntry(OTHER); > if (entries == null) { > MessageFormat form = new MessageFormat(ResourcesMgr.getString > ("No.LoginModules.configured.for.name")); > Object[] source = {name}; > throw new LoginException(form.format(source)); > } > } > moduleStack = new ModuleInfo[entries.length]; > {code} > > *Eventually, the* Credentials *stored in the* Subject *will contain the > username & password of all* LoginModules *in kafka_Client_jaas.conf.* > h2. 2.In the Subject class in JDK, when two kinds of Credentials take > elements, the order of taking elements may not be the same as the order in > which Credentials store elements. The order of taking elements depends on the > order of the {color:#ff0000}hash index{color} of the elements in HashSet. > > h1. Suggestion & Solutions: > > h2. 1.Modification of JaasContext and JaasConfig class construction methods > h3. > 1)When KafkaClient initializes Map<String, JaasContext> jaasContexts, the > construction method of JaasContext needs to pass in the clientSaslMechanism > configured by KafkaClient > And use clientSaslMechanism to filter the entries returned by > configuration.getAppConfigurationEntry(name). When clientSaslMechanism = > PLAIN, the final configurationEntries instance variable should only contain > the PlainLoginModule content in kafka_Client_jaas.conf;When > clientSaslMechanism =SCRAM-SHA-256, configurationEntries should only contain > the content of ScramLoginModule > {code:java} > public JaasContext(String name, Type type, Configuration configuration, > Password dynamicJaasConfig) { > this.name = name; > this.type = type; > this.configuration = configuration; > AppConfigurationEntry[] entries = > configuration.getAppConfigurationEntry(name); > if (entries == null) > throw new IllegalArgumentException("Could not find a '" + name + > "' entry in this JAAS configuration."); > > //Add here the code that uses clientSaslMechanism to verify and > filter entries > > this.configurationEntries = Collections.unmodifiableList(new > ArrayList<>(Arrays.asList(entries))); > this.dynamicJaasConfig = dynamicJaasConfig; > } > {code} > The advantages of this are two points: > (1)For mode == Mode.CLIENT, even if JAAS configuration is configured through > java.security.auth.login.config, the JaasContext of KafkaClient (including > KafkaProducer or KafkaConsumer, etc.) can reach the ClientBroker side > customized configuration > "[listener.name|http://listener.name/].\{listenerName}.\{saslMechanism}.sasl.jaas.config" > semantics,See link > [https://kafka.apache.org/documentation/#security_jaas_broker] . > To achieve the goal: In Map<String, JaasContext> jaasContexts, the > configurationEntries in the JaasContext corresponding to each saslMechanism > only contains the contents of the LoginModule part corresponding to > saslMechanism. > (2)For mode == Mode.SERVER, > SaslChannelBuilder#configure(Map<String, ?> configs) > > {code:java} > public void configure(Map<String, ?> configs) throws KafkaException { > try { > this.configs = configs; > if (mode == Mode.SERVER) { > createServerCallbackHandlers(configs); > createConnectionsMaxReauthMsMap(configs); > } else > createClientCallbackHandler(configs); > for (Map.Entry<String, AuthenticateCallbackHandler> entry : > saslCallbackHandlers.entrySet()) { > String mechanism = entry.getKey(); > entry.getValue().configure(configs, mechanism, > jaasContexts.get(mechanism).configurationEntries()); > } > ...... > > } > {code} > Because we have completed the verification of configurationEntries in > jaasContexts.get(mechanism), when executing entry.getValue().configure(...) > method, in particular, PlainServerCallbackHandler executes configure(...), > The instance variable jaasConfigEntries in PlainServerCallbackHandler will > become more pure, jaasConfigEntries will no longer contain the content of > other LoginModule > h3. 2)JaasConfig Class need to provide a new construction method > In the construction method of JaasContext, the parameter Configuration > configuration is uniformly converted into JaasConfig configuration. Because > of the behavior in the init(String name) method of the LoginContext class of > the JDK, we cannot change it. > * > {code:java} > public JaasConfig(String loginContextName, List<AppConfigurationEntry> > configurationEntries) { > this.loginContextName = loginContextName; > if (configurationEntries == null || configurationEntries.size() == 0) > throw new IllegalArgumentException("JAAS config property does not > contain any login modules"); > this.configEntries = configurationEntries; > } > {code} > Then in the construction method of JaasContext, use the configurationEntries > after verification and filtering to restructure the configuration > > {code:java} > public JaasContext(String name, Type type, Configuration configuration, > Password dynamicJaasConfig) { > this.name = name; > this.type = type; > this.configuration = configuration; > AppConfigurationEntry[] entries = > configuration.getAppConfigurationEntry(name); > if (entries == null) > throw new IllegalArgumentException("Could not find a '" + name + > "' entry in this JAAS configuration."); > > //Add here the code that uses clientSaslMechanism to verify and > filter entries > > this.configurationEntries = Collections.unmodifiableList(new > ArrayList<>(Arrays.asList(entries))); > > if (configuration instanceof JaasConfig) > this.configuration = configuration; > else > this.configuration = new JaasConfig(name, configurationEntries); > > this.dynamicJaasConfig = dynamicJaasConfig; > } > {code} > > h2. 2.Each type of LoginModule in kafka_Client_jaas.conf should only be > configured once > For example, in the following configuration, JaasContext is loaded, and an > exception should be thrown when executing the construction method of > JaasContext > > {code:java} > KafkaClient { > org.apache.kafka.common.security.plain.PlainLoginModule required > username="admin" > password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS" > user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS" > user_alice="alice"; > > org.apache.kafka.common.security.plain.PlainLoginModule required > username="admin2" > password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS" > user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS" > user_tom="tom"; > org.apache.kafka.common.security.scram.ScramLoginModule required > username="admin_scram" > password="DziatPgjXG82sFHc4O1EIuewmlvS"; > > }; > {code} > > > h2. 3. Discussion for Subject class in JDK > The Subject class in the JDK, when fetching elements, SecureSet converts > HashSet. I don't know what the JDK considers. We can try to submit an issue > to the JDK official. When we can fetch elements, the Set type uses > SecureSet/LinkedHashSet, > like SecureSet → SecureSet/LinkedHashSet . > But I recommend making changes on the upper application side, Kafka level. > First of all, whether it is Map<String, JaasContext> jaasContexts or > Map<String, LoginManager> loginManagers variables, the key is the mechanism. > Therefore, *the value corresponding to each mechanism should be purer and > should not contain the contents of the LoginModule of other mechanisms*. This > way, it is also avoided that there are elements greater than 1 in each > Credentials in the Subject. > Secondly, because JDK is designing Subject, Credentials may not pay attention > to the semantics of "*first element*". What the JDK promises is to return > every element in Credentials to you and allow you to change the returned data. > See the comments of the getPrivateCredentials method > {code:java} > Return a Set of private credentials associated with this Subject that are > instances or subclasses of the specified Class. > The caller must have permission to access all of the requested Credentials, > or a SecurityException will be thrown. > The returned Set is not backed by this Subject's internal private Credential > Set. A new Set is created and returned for each method invocation. > Modifications to the returned Set will not affect the internal private > Credential Set. > Params: > c – the returned Set of private credentials will all be instances of this > class. > Type parameters: > <T> – the type of the class modeled by c > Returns: > a Set of private credentials that are instances of the specified Class. > Throws: > NullPointerException – if the specified Class is null. > public <T> Set<T> getPrivateCredentials(Class<T> c) > {code} > > However, when Kafka takes elements from the two Credentials in the Subject, > it uses the semantics of "*take the first element*", which may conflict with > the JDK's concept when designing the Subject. Because you should get all the > elements of each Credentials for identity authentication. > {code:java} > subject.getPrivateCredentials(String.class).iterator().next() > {code} > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)