rajinisivaram commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r467848781
########## File path: clients/src/main/java/org/apache/kafka/clients/admin/ScramMechanism.java ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +/** + * Representation of a SASL/SCRAM Mechanism. + * + * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API">KIP-554: Add Broker-side SCRAM Config API</a> + */ +public enum ScramMechanism { + UNKNOWN((byte) 0), + SCRAM_SHA_256((byte) 1), + SCRAM_SHA_512((byte) 2); + + /** + * + * @param type the type indicator + * @return the instance corresponding to the given type indicator, otherwise {@link #UNKNOWN} + */ + public static ScramMechanism fromType(byte type) { + for (ScramMechanism scramMechanism : ScramMechanism.values()) { + if (scramMechanism.type == type) { + return scramMechanism; + } + } + return UNKNOWN; + } + + /** + * + * @param mechanismName the SASL SCRAM mechanism name + * @return the corresponding SASL SCRAM mechanism enum, otherwise {@link #UNKNOWN} + * @see <a href="https://tools.ietf.org/html/rfc5802#section-4> + * Salted Challenge Response Authentication Mechanism (SCRAM) SASL and GSS-API Mechanisms, Section 4</a> + */ + public static ScramMechanism fromMechanismName(String mechanismName) { + ScramMechanism retvalFoundMechanism = ScramMechanism.valueOf(mechanismName.replace('-', '_')); + return retvalFoundMechanism != null ? retvalFoundMechanism : UNKNOWN; + } + + /** + * + * @return the corresponding SASL SCRAM mechanism name + * @see <a href="https://tools.ietf.org/html/rfc5802#section-4> + * Salted Challenge Response Authentication Mechanism (SCRAM) SASL and GSS-API Mechanisms, Section 4</a> + */ + public String getMechanismName() { Review comment: We don't use `get` prefix elsewhere, just `mechanismName()`? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java ########## @@ -1214,6 +1215,64 @@ default AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlterati */ AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options); + /** + * <p>Describe all SASL/SCRAM credentials. + * + * <p>This is a convenience method for {@link #describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)} + * + * @return The DescribeUserScramCredentialsResult. + */ + default DescribeUserScramCredentialsResult describeUserScramCredentials() { + return describeUserScramCredentials(null, new DescribeUserScramCredentialsOptions()); + } + + /** + * <p>Describe SASL/SCRAM credentials for the given users. + * + * <p>This is a convenience method for {@link #describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)} + * + * @param users the users for which credentials are to be described; all users' credentials are described if null + * or empty. A user explicitly specified here that does not have a SCRAM credential will not appear + * in the results. Review comment: Should we throw an exception for users which don't exist to be consistent with other APIs? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/UserScramCredentialsDescription.java ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * Representation of all SASL/SCRAM credentials associated with a user that can be retrieved. + * + * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API">KIP-554: Add Broker-side SCRAM Config API</a> + */ +public class UserScramCredentialsDescription { + private final String name; + private final List<ScramCredentialInfo> infos; + + /** + * + * @param name the required user name + * @param infos the required SASL/SCRAM credential representations for the user + */ + public UserScramCredentialsDescription(String name, List<ScramCredentialInfo> infos) { + this.name = Objects.requireNonNull(name); + this.infos = Collections.unmodifiableList(new ArrayList<>(Objects.requireNonNull(infos))); + } + + /** + * + * @return the user name + */ + public String getName() { Review comment: Remove `get` prefix ########## File path: core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala ########## @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + + +import java.nio.charset.StandardCharsets +import java.util +import java.util.Properties + +import kafka.network.SocketServer +import kafka.security.authorizer.AclAuthorizer +import org.apache.kafka.clients.admin.ScramMechanism +import org.apache.kafka.common.acl.AclOperation +import org.apache.kafka.common.message.{AlterUserScramCredentialsRequestData, DescribeUserScramCredentialsRequestData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse, DescribeUserScramCredentialsRequest, DescribeUserScramCredentialsResponse} +import org.apache.kafka.common.resource.ResourceType +import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder} +import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult} +import org.junit.Assert._ +import org.junit.{Before, Test} + +import scala.jdk.CollectionConverters._ + +/** + * Test AlterUserScramCredentialsRequest/Response API for the cases where either no credentials are altered + * or failure is expected due to lack of authorization, sending the request to a non-controller broker, or some other issue. + * Also tests the Alter and Describe APIs for the case where credentials are successfully altered/described. + */ +class AlterUserScramCredentialsRequestTest extends BaseRequestTest { + override def brokerPropertyOverrides(properties: Properties): Unit = { + properties.put(KafkaConfig.ControlledShutdownEnableProp, "false") + properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AlterCredentialsTest.TestAuthorizer].getName) + properties.put(KafkaConfig.PrincipalBuilderClassProp, classOf[AlterCredentialsTest.TestPrincipalBuilder].getName) + } + + @Before + override def setUp(): Unit = { + AlterCredentialsTest.principal = KafkaPrincipal.ANONYMOUS // default is to be authorized + super.setUp() + } + + @Test + def testAlterNothing(): Unit = { + val request = new AlterUserScramCredentialsRequest.Builder( + new AlterUserScramCredentialsRequestData() + .setDeletions(new util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialDeletion]) + .setUpsertions(new util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion])).build() + val response = sendAlterUserScramCredentialsRequest(request) + + val results = response.data.results + assertEquals(0, results.size) + } + + @Test + def testAlterNothingNotAuthorized(): Unit = { + AlterCredentialsTest.principal = AlterCredentialsTest.UnauthorizedPrincipal + + val request = new AlterUserScramCredentialsRequest.Builder( + new AlterUserScramCredentialsRequestData() + .setDeletions(new util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialDeletion]) + .setUpsertions(new util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion])).build() + val response = sendAlterUserScramCredentialsRequest(request) + + val results = response.data.results + assertEquals(0, results.size) + } + + @Test + def testAlterSomethingNotAuthorized(): Unit = { + AlterCredentialsTest.principal = AlterCredentialsTest.UnauthorizedPrincipal + + val request = new AlterUserScramCredentialsRequest.Builder( + new AlterUserScramCredentialsRequestData() + .setDeletions(util.Arrays.asList(new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName("name1").setMechanism(ScramMechanism.SCRAM_SHA_256.getType))) + .setUpsertions(util.Arrays.asList(new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName("name2").setMechanism(ScramMechanism.SCRAM_SHA_512.getType)))).build() + val response = sendAlterUserScramCredentialsRequest(request) + + val results = response.data.results + assertEquals(2, results.size) + assertTrue("Expected not authorized", + results.get(0).errorCode == Errors.CLUSTER_AUTHORIZATION_FAILED.code && results.get(1).errorCode == Errors.CLUSTER_AUTHORIZATION_FAILED.code) + } + + @Test + def testAlterSameThingTwice(): Unit = { + val deletion1 = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName("name1").setMechanism(ScramMechanism.SCRAM_SHA_256.getType) + val deletion2 = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName("name2").setMechanism(ScramMechanism.SCRAM_SHA_256.getType) + val upsertion1 = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName("name1").setMechanism(ScramMechanism.SCRAM_SHA_256.getType) + .setIterations(-1).setSalt("salt".getBytes).setSaltedPassword("saltedPassword".getBytes) + val upsertion2 = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName("name2").setMechanism(ScramMechanism.SCRAM_SHA_256.getType) + .setIterations(-1).setSalt("salt".getBytes).setSaltedPassword("saltedPassword".getBytes) + val requests = List ( + new AlterUserScramCredentialsRequest.Builder( + new AlterUserScramCredentialsRequestData() + .setDeletions(util.Arrays.asList(deletion1, deletion1)) + .setUpsertions(util.Arrays.asList(upsertion2, upsertion2))).build(), + new AlterUserScramCredentialsRequest.Builder( + new AlterUserScramCredentialsRequestData() + .setDeletions(util.Arrays.asList(deletion1, deletion2)) + .setUpsertions(util.Arrays.asList(upsertion1, upsertion2))).build(), + ) + requests.foreach(request => { + val response = sendAlterUserScramCredentialsRequest(request) + val results = response.data.results + assertEquals(2, results.size) + assertTrue("Expected error when altering the same credential twice in a single request", + results.get(0).errorCode == Errors.INVALID_REQUEST.code && results.get(1).errorCode == Errors.INVALID_REQUEST.code) + }) + } + + @Test + def testAlterEmptyUser(): Unit = { Review comment: Do we test for empty password? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) { return new AlterClientQuotasResult(Collections.unmodifiableMap(futures)); } + @Override + public DescribeUserScramCredentialsResult describeUserScramCredentials(List<String> users, DescribeUserScramCredentialsOptions options) { + final KafkaFutureImpl<Map<String, UserScramCredentialsDescription>> future = new KafkaFutureImpl<>(); + final long now = time.milliseconds(); + Call call = new Call("describeUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), + new ControllerNodeProvider()) { + @Override + public DescribeUserScramCredentialsRequest.Builder createRequest(int timeoutMs) { + return new DescribeUserScramCredentialsRequest.Builder( + new DescribeUserScramCredentialsRequestData().setUsers(users.stream().map(user -> + new DescribeUserScramCredentialsRequestData.UserName().setName(user)).collect(Collectors.toList()))); + } + + @Override + public void handleResponse(AbstractResponse abstractResponse) { + DescribeUserScramCredentialsResponse response = (DescribeUserScramCredentialsResponse) abstractResponse; + Errors error = Errors.forCode(response.data().error()); + switch (error) { + case NONE: + DescribeUserScramCredentialsResponseData data = response.data(); + future.complete(data.userScramCredentials().stream().collect(Collectors.toMap( + DescribeUserScramCredentialsResponseData.UserScramCredential::name, + userScramCredential -> { + List<ScramCredentialInfo> scramCredentialInfos = userScramCredential.credentialInfos().stream().map( + credentialInfo -> new ScramCredentialInfo(ScramMechanism.fromType(credentialInfo.mechanism()), credentialInfo.iterations())) + .collect(Collectors.toList()); + return new UserScramCredentialsDescription(userScramCredential.name(), scramCredentialInfos); + }))); + break; + case NOT_CONTROLLER: + handleNotControllerError(error); + break; + default: + future.completeExceptionally(new ApiError(error, response.data().errorMessage()).exception()); + break; + } + } + + @Override + void handleFailure(Throwable throwable) { + future.completeExceptionally(throwable); + } + }; + runnable.call(call, now); + return new DescribeUserScramCredentialsResult(future); + } + + @Override + public AlterUserScramCredentialsResult alterUserScramCredentials(List<UserScramCredentialAlteration> alterations, + AlterUserScramCredentialsOptions options) { + final long now = time.milliseconds(); + final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>(); + for (UserScramCredentialAlteration alteration: alterations) { + futures.put(alteration.getUser(), new KafkaFutureImpl<>()); + } + final Map<String, Exception> userIllegalAlterationExceptions = new HashMap<>(); + // We need to keep track of users with deletions of an unknown SCRAM mechanism + alterations.stream().filter(a -> a instanceof UserScramCredentialDeletion).forEach(alteration -> { + UserScramCredentialDeletion deletion = (UserScramCredentialDeletion) alteration; + ScramMechanism mechanism = deletion.getMechanism(); + if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) { + userIllegalAlterationExceptions.put(deletion.getUser(), new InvalidRequestException("Unknown SCRAM mechanism")); + } + }); + // Creating an upsertion may throw InvalidKeyException or NoSuchAlgorithmException, + // so keep track of which users are affected by such a failure and immediately fail all their alterations + final Map<String, Map<ScramMechanism, AlterUserScramCredentialsRequestData.ScramCredentialUpsertion>> userInsertions = new HashMap<>(); + alterations.stream().filter(a -> a instanceof UserScramCredentialUpsertion) + .filter(alteration -> !userIllegalAlterationExceptions.containsKey(alteration.getUser())) + .forEach(alteration -> { + UserScramCredentialUpsertion upsertion = (UserScramCredentialUpsertion) alteration; + String user = upsertion.getUser(); + try { + ScramMechanism mechanism = upsertion.getInfo().getMechanism(); + if (mechanism == null || mechanism == ScramMechanism.UNKNOWN) + throw new InvalidRequestException("Unknown SCRAM mechanism"); + userInsertions.putIfAbsent(user, new HashMap<>()); + userInsertions.get(user).put(mechanism, getScramCredentialUpsertion(upsertion)); + } catch (Exception e) { + // we might overwrite an exception from a previous upsertion, but we don't really care + // since we just need to mark this user as having at least one illegal alteration + // and make an exception instance available for completing the corresponding future exceptionally + userIllegalAlterationExceptions.put(user, e); + } + }); + // fail any users immediately that have an illegal alteration as identified above + userIllegalAlterationExceptions.entrySet().stream().forEach(entry -> { + futures.get(entry.getKey()).completeExceptionally(entry.getValue()); + }); + + // submit alterations for users that do not have an illegal upsertion as identified above + Call call = new Call("alterUserScramCredentials", calcDeadlineMs(now, options.timeoutMs()), + new ControllerNodeProvider()) { + @Override + public AlterUserScramCredentialsRequest.Builder createRequest(int timeoutMs) { + return new AlterUserScramCredentialsRequest.Builder( + new AlterUserScramCredentialsRequestData().setUpsertions(alterations.stream() + .filter(a -> a instanceof UserScramCredentialUpsertion) + .filter(a -> !userIllegalAlterationExceptions.containsKey(a.getUser())) + .map(a -> userInsertions.get(a.getUser()).get(((UserScramCredentialUpsertion) a).getInfo().getMechanism())) + .collect(Collectors.toList())) + .setDeletions(alterations.stream() + .filter(a -> a instanceof UserScramCredentialDeletion) + .filter(a -> !userIllegalAlterationExceptions.containsKey(a.getUser())) + .map(d -> + getScramCredentialDeletion((UserScramCredentialDeletion) d)).collect(Collectors.toList()))); + } + + @Override + public void handleResponse(AbstractResponse abstractResponse) { + AlterUserScramCredentialsResponse response = (AlterUserScramCredentialsResponse) abstractResponse; + // Check for controller change + for (Errors error : response.errorCounts().keySet()) { + if (error == Errors.NOT_CONTROLLER) { + handleNotControllerError(error); + } + } + response.data().results().forEach(result -> { + KafkaFutureImpl<Void> future = futures.get(result.user()); + if (future == null) { + log.warn("Server response mentioned unknown user {}", result.user()); + } else { + Errors error = Errors.forCode(result.errorCode()); + if (error != Errors.NONE) { + future.completeExceptionally(error.exception(result.errorMessage())); + } else { + future.complete(null); + } + } + }); + completeUnrealizedFutures( + futures.entrySet().stream(), + user -> "The broker response did not contain a result for user " + user); + } + + @Override + void handleFailure(Throwable throwable) { + completeAllExceptionally(futures.values(), throwable); + } + }; + runnable.call(call, now); + return new AlterUserScramCredentialsResult(new HashMap<>(futures)); + } + + private static AlterUserScramCredentialsRequestData.ScramCredentialUpsertion getScramCredentialUpsertion(UserScramCredentialUpsertion u) throws InvalidKeyException, NoSuchAlgorithmException { + AlterUserScramCredentialsRequestData.ScramCredentialUpsertion retval = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion(); + return retval.setName(u.getUser()) + .setMechanism(u.getInfo().getMechanism().getType()) + .setIterations(u.getInfo().getIterations()) + .setSalt(u.getSalt()) + .setSaltedPassword(getSaltedPasword(u.getInfo().getMechanism(), u.getPassword(), u.getSalt(), u.getInfo().getIterations())); + } + + private static AlterUserScramCredentialsRequestData.ScramCredentialDeletion getScramCredentialDeletion(UserScramCredentialDeletion d) { + return new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName(d.getUser()).setMechanism(d.getMechanism().getType()); + } + + private static byte[] getSaltedPasword(ScramMechanism publicScramMechanism, byte[] password, byte[] salt, int iterations) throws NoSuchAlgorithmException, InvalidKeyException { + return new ScramFormatter(org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(publicScramMechanism.getMechanismName())) + .hi(password, salt, iterations); Review comment: Iterations can be -1 here? Won't we end up sending a password without applying the salt properly? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/ScramMechanism.java ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +/** + * Representation of a SASL/SCRAM Mechanism. + * + * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API">KIP-554: Add Broker-side SCRAM Config API</a> + */ +public enum ScramMechanism { + UNKNOWN((byte) 0), + SCRAM_SHA_256((byte) 1), + SCRAM_SHA_512((byte) 2); + + /** + * + * @param type the type indicator + * @return the instance corresponding to the given type indicator, otherwise {@link #UNKNOWN} + */ + public static ScramMechanism fromType(byte type) { + for (ScramMechanism scramMechanism : ScramMechanism.values()) { + if (scramMechanism.type == type) { + return scramMechanism; + } + } + return UNKNOWN; + } + + /** + * + * @param mechanismName the SASL SCRAM mechanism name + * @return the corresponding SASL SCRAM mechanism enum, otherwise {@link #UNKNOWN} + * @see <a href="https://tools.ietf.org/html/rfc5802#section-4> + * Salted Challenge Response Authentication Mechanism (SCRAM) SASL and GSS-API Mechanisms, Section 4</a> + */ + public static ScramMechanism fromMechanismName(String mechanismName) { + ScramMechanism retvalFoundMechanism = ScramMechanism.valueOf(mechanismName.replace('-', '_')); + return retvalFoundMechanism != null ? retvalFoundMechanism : UNKNOWN; + } + + /** + * + * @return the corresponding SASL SCRAM mechanism name + * @see <a href="https://tools.ietf.org/html/rfc5802#section-4> + * Salted Challenge Response Authentication Mechanism (SCRAM) SASL and GSS-API Mechanisms, Section 4</a> + */ + public String getMechanismName() { + return this.mechanismName; + } + + /** + * + * @return the type indicator for this SASL SCRAM mechanism + */ + public byte getType() { Review comment: As before, `type()`? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/ScramCredentialInfo.java ########## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import java.util.Objects; + +/** + * Mechanism and iterations for a SASL/SCRAM credential associated with a user. + * + * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API">KIP-554: Add Broker-side SCRAM Config API</a> + */ +public class ScramCredentialInfo { + private final ScramMechanism mechanism; + private final int iterations; + + /** + * + * @param mechanism the required mechanism + * @param iterations the number of iterations used when creating the credential + */ + public ScramCredentialInfo(ScramMechanism mechanism, int iterations) { + this.mechanism = Objects.requireNonNull(mechanism); + this.iterations = iterations; + } + + /** + * + * @return the mechanism + */ + public ScramMechanism getMechanism() { + return mechanism; + } + + /** + * + * @return the number of iterations used when creating the credential Review comment: or -1? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/UserScramCredentialsDescription.java ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * Representation of all SASL/SCRAM credentials associated with a user that can be retrieved. + * + * @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API">KIP-554: Add Broker-side SCRAM Config API</a> + */ +public class UserScramCredentialsDescription { + private final String name; + private final List<ScramCredentialInfo> infos; + + /** + * + * @param name the required user name + * @param infos the required SASL/SCRAM credential representations for the user + */ + public UserScramCredentialsDescription(String name, List<ScramCredentialInfo> infos) { + this.name = Objects.requireNonNull(name); + this.infos = Collections.unmodifiableList(new ArrayList<>(Objects.requireNonNull(infos))); + } + + /** + * + * @return the user name + */ + public String getName() { + return name; + } + + /** + * + * @return the unmodifiable list of SASL/SCRAM credential representations for the user + */ + public List<ScramCredentialInfo> getInfos() { Review comment: `credentialInfos()`? ########## File path: core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala ########## @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + + +import java.nio.charset.StandardCharsets +import java.util +import java.util.Properties + +import kafka.network.SocketServer +import kafka.security.authorizer.AclAuthorizer +import org.apache.kafka.clients.admin.ScramMechanism +import org.apache.kafka.common.acl.AclOperation +import org.apache.kafka.common.message.{AlterUserScramCredentialsRequestData, DescribeUserScramCredentialsRequestData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse, DescribeUserScramCredentialsRequest, DescribeUserScramCredentialsResponse} +import org.apache.kafka.common.resource.ResourceType +import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder} +import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult} +import org.junit.Assert._ +import org.junit.{Before, Test} + +import scala.jdk.CollectionConverters._ + +/** + * Test AlterUserScramCredentialsRequest/Response API for the cases where either no credentials are altered + * or failure is expected due to lack of authorization, sending the request to a non-controller broker, or some other issue. + * Also tests the Alter and Describe APIs for the case where credentials are successfully altered/described. + */ +class AlterUserScramCredentialsRequestTest extends BaseRequestTest { + override def brokerPropertyOverrides(properties: Properties): Unit = { + properties.put(KafkaConfig.ControlledShutdownEnableProp, "false") + properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AlterCredentialsTest.TestAuthorizer].getName) + properties.put(KafkaConfig.PrincipalBuilderClassProp, classOf[AlterCredentialsTest.TestPrincipalBuilder].getName) + } + + @Before + override def setUp(): Unit = { + AlterCredentialsTest.principal = KafkaPrincipal.ANONYMOUS // default is to be authorized + super.setUp() + } + + @Test + def testAlterNothing(): Unit = { + val request = new AlterUserScramCredentialsRequest.Builder( + new AlterUserScramCredentialsRequestData() + .setDeletions(new util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialDeletion]) + .setUpsertions(new util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion])).build() + val response = sendAlterUserScramCredentialsRequest(request) + + val results = response.data.results + assertEquals(0, results.size) + } + + @Test + def testAlterNothingNotAuthorized(): Unit = { + AlterCredentialsTest.principal = AlterCredentialsTest.UnauthorizedPrincipal + + val request = new AlterUserScramCredentialsRequest.Builder( + new AlterUserScramCredentialsRequestData() + .setDeletions(new util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialDeletion]) + .setUpsertions(new util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion])).build() + val response = sendAlterUserScramCredentialsRequest(request) + + val results = response.data.results + assertEquals(0, results.size) + } + + @Test + def testAlterSomethingNotAuthorized(): Unit = { + AlterCredentialsTest.principal = AlterCredentialsTest.UnauthorizedPrincipal + + val request = new AlterUserScramCredentialsRequest.Builder( + new AlterUserScramCredentialsRequestData() + .setDeletions(util.Arrays.asList(new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName("name1").setMechanism(ScramMechanism.SCRAM_SHA_256.getType))) + .setUpsertions(util.Arrays.asList(new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName("name2").setMechanism(ScramMechanism.SCRAM_SHA_512.getType)))).build() + val response = sendAlterUserScramCredentialsRequest(request) + + val results = response.data.results + assertEquals(2, results.size) + assertTrue("Expected not authorized", + results.get(0).errorCode == Errors.CLUSTER_AUTHORIZATION_FAILED.code && results.get(1).errorCode == Errors.CLUSTER_AUTHORIZATION_FAILED.code) + } + + @Test + def testAlterSameThingTwice(): Unit = { + val deletion1 = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName("name1").setMechanism(ScramMechanism.SCRAM_SHA_256.getType) + val deletion2 = new AlterUserScramCredentialsRequestData.ScramCredentialDeletion().setName("name2").setMechanism(ScramMechanism.SCRAM_SHA_256.getType) + val upsertion1 = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName("name1").setMechanism(ScramMechanism.SCRAM_SHA_256.getType) + .setIterations(-1).setSalt("salt".getBytes).setSaltedPassword("saltedPassword".getBytes) + val upsertion2 = new AlterUserScramCredentialsRequestData.ScramCredentialUpsertion().setName("name2").setMechanism(ScramMechanism.SCRAM_SHA_256.getType) + .setIterations(-1).setSalt("salt".getBytes).setSaltedPassword("saltedPassword".getBytes) + val requests = List ( + new AlterUserScramCredentialsRequest.Builder( + new AlterUserScramCredentialsRequestData() + .setDeletions(util.Arrays.asList(deletion1, deletion1)) + .setUpsertions(util.Arrays.asList(upsertion2, upsertion2))).build(), + new AlterUserScramCredentialsRequest.Builder( + new AlterUserScramCredentialsRequestData() + .setDeletions(util.Arrays.asList(deletion1, deletion2)) + .setUpsertions(util.Arrays.asList(upsertion1, upsertion2))).build(), + ) + requests.foreach(request => { + val response = sendAlterUserScramCredentialsRequest(request) + val results = response.data.results + assertEquals(2, results.size) + assertTrue("Expected error when altering the same credential twice in a single request", + results.get(0).errorCode == Errors.INVALID_REQUEST.code && results.get(1).errorCode == Errors.INVALID_REQUEST.code) Review comment: assertEquals so we know what the error was if test fails? (mutliple places) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org