rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r468149236
########## File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java ########## @@ -1214,6 +1215,64 @@ default AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlterati */ AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options); + /** + * <p>Describe all SASL/SCRAM credentials. + * + * <p>This is a convenience method for {@link #describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)} + * + * @return The DescribeUserScramCredentialsResult. + */ + default DescribeUserScramCredentialsResult describeUserScramCredentials() { + return describeUserScramCredentials(null, new DescribeUserScramCredentialsOptions()); + } + + /** + * <p>Describe SASL/SCRAM credentials for the given users. + * + * <p>This is a convenience method for {@link #describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)} + * + * @param users the users for which credentials are to be described; all users' credentials are described if null + * or empty. A user explicitly specified here that does not have a SCRAM credential will not appear + * in the results. Review comment: Hmm, good question. The KIP doesn't state what do do here. @cmccabe thoughts? ########## File path: core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala ########## @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + + +import java.nio.charset.StandardCharsets +import java.util +import java.util.Properties + +import kafka.network.SocketServer +import kafka.security.authorizer.AclAuthorizer +import org.apache.kafka.clients.admin.ScramMechanism +import org.apache.kafka.common.acl.AclOperation +import org.apache.kafka.common.message.{AlterUserScramCredentialsRequestData, DescribeUserScramCredentialsRequestData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse, DescribeUserScramCredentialsRequest, DescribeUserScramCredentialsResponse} +import org.apache.kafka.common.resource.ResourceType +import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder} +import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult} +import org.junit.Assert._ +import org.junit.{Before, Test} + +import scala.jdk.CollectionConverters._ + +/** + * Test AlterUserScramCredentialsRequest/Response API for the cases where either no credentials are altered + * or failure is expected due to lack of authorization, sending the request to a non-controller broker, or some other issue. + * Also tests the Alter and Describe APIs for the case where credentials are successfully altered/described. + */ +class AlterUserScramCredentialsRequestTest extends BaseRequestTest { + override def brokerPropertyOverrides(properties: Properties): Unit = { + properties.put(KafkaConfig.ControlledShutdownEnableProp, "false") + properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AlterCredentialsTest.TestAuthorizer].getName) + properties.put(KafkaConfig.PrincipalBuilderClassProp, classOf[AlterCredentialsTest.TestPrincipalBuilder].getName) + } + + @Before + override def setUp(): Unit = { + AlterCredentialsTest.principal = KafkaPrincipal.ANONYMOUS // default is to be authorized + super.setUp() + } + + @Test + def testAlterNothing(): Unit = { + val request = new AlterUserScramCredentialsRequest.Builder( + new AlterUserScramCredentialsRequestData() + .setDeletions(new util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialDeletion]) + .setUpsertions(new util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion])).build() + val response = sendAlterUserScramCredentialsRequest(request) + + val results = response.data.results + assertEquals(0, results.size) + } + + @Test + def testAlterNothingNotAuthorized(): Unit = { + AlterCredentialsTest.principal = AlterCredentialsTest.UnauthorizedPrincipal + + val request = new AlterUserScramCredentialsRequest.Builder( + new AlterUserScramCredentialsRequestData() + .setDeletions(new util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialDeletion]) + .setUpsertions(new util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion])).build() + val response = sendAlterUserScramCredentialsRequest(request) + + val results = response.data.results + assertEquals(0, results.size) + } + + @Test + def testAlterSomethingNotAuthorized(): Unit = { + AlterCredentialsTest.principal = AlterCredentialsTest.UnauthorizedPrincipal + + val request = new AlterUserScramCredentialsRequest.Builder( + new AlterUserScramCredentialsRequestData() + .setDeletions(util.Arrays.asList(new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName("name1").setMechanism(ScramMechanism.SCRAM_SHA_256.getType))) + .setUpsertions(util.Arrays.asList(new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName("name2").setMechanism(ScramMechanism.SCRAM_SHA_512.getType)))).build() + val response = sendAlterUserScramCredentialsRequest(request) + + val results = response.data.results + assertEquals(2, results.size) + assertTrue("Expected not authorized", + results.get(0).errorCode == Errors.CLUSTER_AUTHORIZATION_FAILED.code && results.get(1).errorCode == Errors.CLUSTER_AUTHORIZATION_FAILED.code) + } + + @Test + def testAlterSameThingTwice(): Unit = { + val deletion1 = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName("name1").setMechanism(ScramMechanism.SCRAM_SHA_256.getType) + val deletion2 = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName("name2").setMechanism(ScramMechanism.SCRAM_SHA_256.getType) + val upsertion1 = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName("name1").setMechanism(ScramMechanism.SCRAM_SHA_256.getType) + .setIterations(-1).setSalt("salt".getBytes).setSaltedPassword("saltedPassword".getBytes) + val upsertion2 = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName("name2").setMechanism(ScramMechanism.SCRAM_SHA_256.getType) + .setIterations(-1).setSalt("salt".getBytes).setSaltedPassword("saltedPassword".getBytes) + val requests = List ( + new AlterUserScramCredentialsRequest.Builder( + new AlterUserScramCredentialsRequestData() + .setDeletions(util.Arrays.asList(deletion1, deletion1)) + .setUpsertions(util.Arrays.asList(upsertion2, upsertion2))).build(), + new AlterUserScramCredentialsRequest.Builder( + new AlterUserScramCredentialsRequestData() + .setDeletions(util.Arrays.asList(deletion1, deletion2)) + .setUpsertions(util.Arrays.asList(upsertion1, upsertion2))).build(), + ) + requests.foreach(request => { + val response = sendAlterUserScramCredentialsRequest(request) + val results = response.data.results + assertEquals(2, results.size) + assertTrue("Expected error when altering the same credential twice in a single request", + results.get(0).errorCode == Errors.INVALID_REQUEST.code && results.get(1).errorCode == Errors.INVALID_REQUEST.code) + }) + } + + @Test + def testAlterEmptyUser(): Unit = { Review comment: Added a test for it in `UserScramCredentialsCommandTest`. We can't test for it here because we get a salted password here, and I don't think it is possible for that to be an empty string and allow a successful SASL/SCRAM authentication. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/ScramCredentialInfo.java ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import java.util.Objects; + +/** + * Mechanism and iterations for a SASL/SCRAM credential associated with a user. + * + * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API">KIP-554: Add Broker-side SCRAM Config API</a> + */ +public class ScramCredentialInfo { + private final ScramMechanism mechanism; + private final int iterations; + + /** + * + * @param mechanism the required mechanism + * @param iterations the number of iterations used when creating the credential + */ + public ScramCredentialInfo(ScramMechanism mechanism, int iterations) { + this.mechanism = Objects.requireNonNull(mechanism); + this.iterations = iterations; + } + + /** + * + * @return the mechanism + */ + public ScramMechanism getMechanism() { + return mechanism; + } + + /** + * + * @return the number of iterations used when creating the credential Review comment: TBD. No change needed if we are getting rid of `-1` as a special value. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) { return new AlterClientQuotasResult(Collections.unmodifiableMap(futures)); } + @Override + public DescribeUserScramCredentialsResult describeUserScramCredentials(List<String> users, DescribeUserScramCredentialsOptions options) { + final KafkaFutureImpl<Map<String, UserScramCredentialsDescription>> future = new KafkaFutureImpl<>(); + final long now = time.milliseconds(); + Call call = new Call("describeUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), + new ControllerNodeProvider()) { + @Override + public DescribeUserScramCredentialsRequest.Builder createRequest(int timeoutMs) { + return new DescribeUserScramCredentialsRequest.Builder( + new DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user -> + new DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList()))); + } + + @Override + public void handleResponse(AbstractResponse abstractResponse) { + DescribeUserScramCredentialsResponse response = (DescribeUserScramCredentialsResponse) abstractResponse; + Errors error = Errors.forCode(response.data().error()); + switch (error) { + case NONE: + DescribeUserScramCredentialsResponseData data = response.data(); + future.complete(data.userScramCredentials().stream().collect(Collectors.toMap( + DescribeUserScramCredentialsResponseData.UserScramCredential::name, + userScramCredential -> { + List<ScramCredentialInfo> scramCredentialInfos = userScramCredential.credentialInfos().stream().map( + credentialInfo -> new ScramCredentialInfo(ScramMechanism.fromType(credentialInfo.mechanism()), credentialInfo.iterations())) + .collect(Collectors.toList()); + return new UserScramCredentialsDescription(userScramCredential.name(), scramCredentialInfos); + }))); + break; + case NOT_CONTROLLER: + handleNotControllerError(error); + break; + default: + future.completeExceptionally(new ApiError(error, response.data().errorMessage()).exception()); + break; + } + } + + @Override + void handleFailure(Throwable throwable) { + future.completeExceptionally(throwable); + } + }; + runnable.call(call, now); + return new DescribeUserScramCredentialsResult(future); + } + + @Override + public AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredentialAlteration> alterations, + AlterUserScramCredentialsOptions options) { + final long now = time.milliseconds(); + final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>(); + for (UserScramCredentialAlteration alteration: alterations) { + futures.put(alteration.getUser(), new KafkaFutureImpl<>()); + } + final Map<String, Exception> userIllegalAlterationExceptions = new HashMap<>(); + // We need to keep track of users with deletions of an unknown SCRAM mechanism + alterations.stream().filter(a -> a instanceof UserScramCredentialDeletion).forEach(alteration -> { + UserScramCredentialDeletion deletion = (UserScramCredentialDeletion) alteration; + ScramMechanism mechanism = deletion.getMechanism(); + if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) { + userIllegalAlterationExceptions.put(deletion.getUser(), new InvalidRequestException("Unknown SCRAM mechanism")); + } + }); + // Creating an upsertion may throw InvalidKeyException or NoSuchAlgorithmException, + // so keep track of which users are affected by such a failure and immediately fail all their alterations + final Map<String, Map<ScramMechanism, AlterUserScramCredentialsRequestData.ScramCredentialUpsertion>> userInsertions = new HashMap<>(); + alterations.stream().filter(a -> a instanceof UserScramCredentialUpsertion) + .filter(alteration -> !userIllegalAlterationExceptions.containsKey(alteration.getUser())) + .forEach(alteration -> { + UserScramCredentialUpsertion upsertion = (UserScramCredentialUpsertion) alteration; + String user = upsertion.getUser(); + try { + ScramMechanism mechanism = upsertion.getInfo().getMechanism(); + if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) + throw new InvalidRequestException("Unknown SCRAM mechanism"); + userInsertions.putIfAbsent(user, new HashMap<>()); + userInsertions.get(user).put(mechanism, getScramCredentialUpsertion(upsertion)); + } catch (Exception e) { + // we might overwrite an exception from a previous upsertion, but we don't really care + // since we just need to mark this user as having at least one illegal alteration + // and make an exception instance available for completing the corresponding future exceptionally + userIllegalAlterationExceptions.put(user, e); + } + }); + // fail any users immediately that have an illegal alteration as identified above + userIllegalAlterationExceptions.entrySet().stream().forEach(entry -> { + futures.get(entry.getKey()).completeExceptionally(entry.getValue()); + }); + + // submit alterations for users that do not have an illegal upsertion as identified above + Call call = new Call("alterUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), + new ControllerNodeProvider()) { + @Override + public AlterUserScramCredentialsRequest.Builder createRequest(int timeoutMs) { + return new AlterUserScramCredentialsRequest.Builder( + new AlterUserScramCredentialsRequestData().setUpsertions(alterations.stream() + .filter(a -> a instanceof UserScramCredentialUpsertion) + .filter(a -> !userIllegalAlterationExceptions.containsKey(a.getUser())) + .map(a -> userInsertions.get(a.getUser()).get(((UserScramCredentialUpsertion) a).getInfo().getMechanism())) + .collect(Collectors.toList())) + .setDeletions(alterations.stream() + .filter(a -> a instanceof UserScramCredentialDeletion) + .filter(a -> !userIllegalAlterationExceptions.containsKey(a.getUser())) + .map(d -> + getScramCredentialDeletion((UserScramCredentialDeletion) d)).collect(Collectors.toList()))); + } + + @Override + public void handleResponse(AbstractResponse abstractResponse) { + AlterUserScramCredentialsResponse response = (AlterUserScramCredentialsResponse) abstractResponse; + // Check for controller change + for (Errors error : response.errorCounts().keySet()) { + if (error == Errors.NOT_CONTROLLER) { + handleNotControllerError(error); + } + } + response.data().results().forEach(result -> { + KafkaFutureImpl<Void> future = futures.get(result.user()); + if (future == null) { + log.warn("Server response mentioned unknown user {}", result.user()); + } else { + Errors error = Errors.forCode(result.errorCode()); + if (error != Errors.NONE) { + future.completeExceptionally(error.exception(result.errorMessage())); + } else { + future.complete(null); + } + } + }); + completeUnrealizedFutures( + futures.entrySet().stream(), + user -> "The broker response did not contain a result for user " + user); + } + + @Override + void handleFailure(Throwable throwable) { + completeAllExceptionally(futures.values(), throwable); + } + }; + runnable.call(call, now); + return new AlterUserScramCredentialsResult(new HashMap<>(futures)); + } + + private static AlterUserScramCredentialsRequestData.ScramCredentialUpsertion getScramCredentialUpsertion(UserScramCredentialUpsertion u) throws InvalidKeyException, NoSuchAlgorithmException { + AlterUserScramCredentialsRequestData.ScramCredentialUpsertion retval = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion(); + return retval.setName(u.getUser()) + .setMechanism(u.getInfo().getMechanism().getType()) + .setIterations(u.getInfo().getIterations()) + .setSalt(u.getSalt()) + .setSaltedPassword(getSaltedPasword(u.getInfo().getMechanism(), u.getPassword(), u.getSalt(), u.getInfo().getIterations())); + } + + private static AlterUserScramCredentialsRequestData.ScramCredentialDeletion getScramCredentialDeletion(UserScramCredentialDeletion d) { + return new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(d.getUser()).setMechanism(d.getMechanism().getType()); + } + + private static byte[] getSaltedPasword(ScramMechanism publicScramMechanism, byte[] password, byte[] salt, int iterations) throws NoSuchAlgorithmException, InvalidKeyException { + return new ScramFormatter(org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(publicScramMechanism.getMechanismName())) + .hi(password, salt, iterations); Review comment: Yeah, I think there is a fundamental problem with allowing the value `-1` for iterations. The KIP says "`Note that if the number of iterations is set to -1, the server-side default will be used.`". But we are on the client at this point in the code, and there is no concept for "server-side default" for SCRAM iterations in Kafka. And unfortunately since we are salting the password here, we need to know the number of iterations. So I think we need to do either of the following: 1. Add an ability to define server-side default number of iterations per SASL/SCRAM mechanism in Kafka and allow clients to learn them. 2. Get rid of `-1` as a special value. It's pretty clear to me that (2) is the way to go, but @rajinisivaram and @cmccabe please chime in. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/ScramMechanism.java ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +/** + * Representation of a SASL/SCRAM Mechanism. + * + * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API">KIP-554: Add Broker-side SCRAM Config API</a> + */ +public enum ScramMechanism { + UNKNOWN((byte) 0), + SCRAM_SHA_256((byte) 1), + SCRAM_SHA_512((byte) 2); + + /** + * + * @param type the type indicator + * @return the instance corresponding to the given type indicator, otherwise {@link #UNKNOWN} + */ + public static ScramMechanism fromType(byte type) { + for (ScramMechanism scramMechanism : ScramMechanism.values()) { + if (scramMechanism.type == type) { + return scramMechanism; + } + } + return UNKNOWN; + } + + /** + * + * @param mechanismName the SASL SCRAM mechanism name + * @return the corresponding SASL SCRAM mechanism enum, otherwise {@link #UNKNOWN} + * @see <a href="https://tools.ietf.org/html/rfc5802#section-4> + * Salted Challenge Response Authentication Mechanism (SCRAM) SASL and GSS-API Mechanisms, Section 4</a> + */ + public static ScramMechanism fromMechanismName(String mechanismName) { + ScramMechanism retvalFoundMechanism = ScramMechanism.valueOf(mechanismName.replace('-', '_')); + return retvalFoundMechanism != null ? retvalFoundMechanism : UNKNOWN; + } + + /** + * + * @return the corresponding SASL SCRAM mechanism name + * @see <a href="https://tools.ietf.org/html/rfc5802#section-4> + * Salted Challenge Response Authentication Mechanism (SCRAM) SASL and GSS-API Mechanisms, Section 4</a> + */ + public String getMechanismName() { Review comment: I made the changes here and also removed all `get` prefixes on getters within `ScramCredentialInfo` and `UserScramCredential{Alteration,Deletion,Upsertion,Description}` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org